In [10]:
import yfinance as yf
import talib
import pandas as pd
import numpy as np
import plotly.graph_objects as go

In [11]:
class Indicator:
    def SMA_CROSS(self, data, short_period, long_period):
        data[f'SMA_{short_period}'] = talib.SMA(data['Close'].values, timeperiod=short_period)
        data[f'SMA_{long_period}'] = talib.SMA(data['Close'].values, timeperiod=long_period)
        return data



    def Baseline(self, data,period):
        data['baseline'] = data['Close'].rolling(window=period).mean()
        return data



    def SSL(self, data, period):
        data['smaHigh'] = data['High'].rolling(window=period).mean()
        data['smaLow'] = data['Low'].rolling(window=period).mean()
        data['sslDown'] = data.apply(lambda row: row['smaHigh'] if row['Close'] < row['smaLow'] else row['smaLow'], axis=1)
        data['sslUp'] = data.apply(lambda row: row['smaLow'] if row['Close'] < row['smaLow'] else row['smaHigh'], axis=1)
        data['Position'] = data.apply(lambda row: "buy" if row['smaLow'] == row['sslDown'] else "sell", axis=1)
        data['SIGNAL'] = data['Position'] != data['Position'].shift(1)
        data['SSL_BUY'] = (data['Position'] == 'buy') & (data['SIGNAL'])
        data['SSL_SELL'] = (data['Position'] == 'sell') & (data['SIGNAL'])
        return data


    def ATR(self, data, timeperiod=14):
        data['ATR'] = talib.ATR(data['High'].values, data['Low'].values, data['Close'].values, timeperiod=timeperiod)
        data['atrup'] = data['Close'] + data['ATR']
        data['atrdown'] = data['Close'] - data['ATR']

        # Calculate SL and TP based on the Position
        data['SL'] = data.apply(lambda row: row['Close'] - (row['ATR'] * 1.5) if row['Position'] == 'buy' else row['Close'] + (row['ATR'] * 1.5), axis=1)
        data['TP'] = data.apply(lambda row: row['Close'] + row['ATR'] if row['Position'] == 'buy' else row['Close'] - row['ATR'], axis=1)

        return data

    def WAE(self, df, sensitivity=150, fastLength=20, slowLength=40, channelLength=20, mult=2.0):
        df['Close'] = df['Close'].astype(float)
        df['macd'], _, _ = talib.MACD(df['Close'], fastperiod=fastLength, slowperiod=slowLength, signalperiod=9)

        # Manual Bollinger Bands calculation
        df['std'] = df['Close'].rolling(window=channelLength).std()
        df['bb_upper'] = df['baseline'] + (mult * df['std'])
        df['bb_lower'] = df['baseline'] - (mult * df['std'])

        df['t1'] = (df['macd'] - df['macd'].shift(1)) * sensitivity
        df['e1'] = df['bb_upper'] - df['bb_lower']

        df['trendUp'] = np.where(df['t1'] >= 0, df['t1'], 0)
        df['trendDown'] = np.where(df['t1'] < 0, -df['t1'], 0)
        def final_signal(row):
          long_conditon = row['Close'] > row['baseline'] and row['SSL_BUY'] and row['trendUp']>row['e1']
          short_conditon = row['Close'] < row['baseline'] and row['SSL_SELL'] and row['trendDown']>row['e1']

          if long_conditon:
              return "BUY"
          elif short_conditon:
              return "SELL"
          else:
              return None
        df['FINAL_SIGNAL'] = df.apply(final_signal,axis = 1)
        return df

In [12]:

class backtest():
  @staticmethod
  def strategy(df):
      baseline_df = Indicator().Baseline(data=df, period=10)
      ssl_df = Indicator().SSL(data=baseline_df, period=10)
      atr_df = Indicator().ATR(data=ssl_df,timeperiod=14)
      wae_df=Indicator().WAE(df=atr_df)
      return wae_df

  @staticmethod
  def backtest_forex(df):
    entry_price = None
    trade_type = None
    pips_gained = []
    for index, row in df.iterrows():
      if row['FINAL_SIGNAL'] == 'SELL' or row['FINAL_SIGNAL'] == 'BUY':
        if entry_price is None:
          entry_price = row['Close']
          trade_type = row['FINAL_SIGNAL']
          print(entry_price,trade_type)
        elif entry_price is not None:

          exit_price = row['Close']
          pips = exit_price - entry_price if trade_type == "SELL" else entry_price - exit_price
          pips_gained.append(abs(pips))
          print(exit_price,entry_price,abs(pips))

          entry_price = row['Close']
          trade_type = row['FINAL_SIGNAL']
          print(entry_price,trade_type)

    return sum(pips_gained)


  @staticmethod
  def backtest_stock(df):
      collection = []           # Define inside the function to keep the state within the function scope
      collection_units = []     # Define inside the function to keep the state within the function scope
      gain_amount = []
      positive_count = 0        # Initialize the positive count
      negative_count = 0        # Initialize the negative count

      unit = 10
      for index, row in df.iterrows():
          if row['FINAL_SIGNAL'] == "BUY":
              investment = unit * row['Close']
              collection_units.append(unit)
              collection.append(investment)
          elif row['FINAL_SIGNAL'] == "SELL":
              if collection:
                  total_investment = sum(collection)
                  total_units = sum(collection_units)
                  total_selling_price = row['Close'] * total_units
                  gain = total_selling_price - total_investment
                  gain_amount.append(gain)
                  print(f"Sold Units: {total_units}")
                  print(f"Total investment: {total_investment}")
                  print(f"Total Selling Price: {total_selling_price}")
                  print(f"Gain from stock: {gain}")
                  collection = []  # Reset collection after selling
                  collection_units = []  # Reset units after selling
                  if gain > 0:
                      positive_count += 1
                  elif gain < 0:
                      negative_count += 1

      total_gain = sum(gain_amount)
      total_trades = positive_count + negative_count
      win_rate = (positive_count / total_trades * 100) if total_trades > 0 else 0

      print(f"Total Gain: {total_gain}")
      print(f"Positive Gains: {positive_count}")
      print(f"Negative Gains: {negative_count}")
      print(f"Win %: {win_rate}")

  # Example usage:
  # Assuming df is your DataFrame with the necessary columns
  # stock_bt(df)


In [13]:
df = yf.download(tickers="AAPL",period="max",interval="1d")

[*********************100%%**********************]  1 of 1 completed


In [14]:
df = backtest().strategy(df)

In [32]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np  # Import numpy for np.where function

def create_figure_sig(sig):
    # Initialize figure with subplots
    fig_ssl = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.05)

    # Candlestick chart
    fig_ssl.add_trace(go.Candlestick(
        x=sig['Time'],
        open=sig['Open'],
        high=sig['High'],
        low=sig['Low'],
        close=sig['Close'],
        name='Candlestick',
        line=dict(width=1),
        opacity=1,
        increasing_fillcolor='#24A06B',
        decreasing_fillcolor="#CC2E3C",
        increasing_line_color='#2EC886',
        decreasing_line_color='#FF3A4C'
    ), row=1, col=1)

    #TP and SL adder
    fig_ssl.add_trace(go.Scatter(

    ))

    # SSLUp, SSLDown, Baseline traces
    fig_ssl.add_trace(go.Scatter(
        x=sig['Time'],
        y=sig['sslUp'],
        name='sslUp',
        mode='lines',
        line=dict(color='green', width=2)
    ), row=1, col=1)

    fig_ssl.add_trace(go.Scatter(
        x=sig['Time'],
        y=sig['sslDown'],
        name='sslDown',
        mode='lines',
        line=dict(color='red', width=2)
    ), row=1, col=1)

    fig_ssl.add_trace(go.Scatter(
        x=sig['Time'],
        y=sig['baseline'],
        name='Baseline',
        mode='lines',
        line=dict(color='yellow', width=2)
    ), row=1, col=1)

    # Buy and Sell signals
    fig_ssl.add_trace(go.Scatter(
        x=sig[sig['FINAL_SIGNAL'] == "BUY"]['Time'],
        y=sig[sig['FINAL_SIGNAL'] == "BUY"]['Open'],
        name='BUY_ARROW',
        mode='markers',
        marker=dict(color='green', symbol='triangle-up', size=10)
    ), row=1, col=1)

    fig_ssl.add_trace(go.Scatter(
        x=sig[sig['FINAL_SIGNAL'] == "SELL"]['Time'],
        y=sig[sig['FINAL_SIGNAL'] == "SELL"]['Open'],
        name='SELL_ARROW',
        mode='markers',
        marker=dict(color='red', symbol='triangle-down', size=10)
    ), row=1, col=1)

    # WAE chart
    fig_ssl.add_trace(go.Bar(
        x=sig['Time'],
        y=sig['trendUp'],
        marker_color=np.where(sig['trendUp'] < sig['trendUp'].shift(1), 'lime', 'green'),
        name='UpTrend',
        width=0.04,
        marker_line_color='rgba(0,0,0,0)',
        marker_line_width=0,
    ), row=2, col=1)

    fig_ssl.add_trace(go.Bar(
        x=sig['Time'],
        y=sig['trendDown'],
        marker_color=np.where(sig['trendDown'] < sig['trendDown'].shift(1), 'orange', 'red'),
        name='DownTrend',
        width=0.04,
        marker_line_color='rgba(0,0,0,0)',
        marker_line_width=0,
    ), row=2, col=1)

    fig_ssl.add_trace(go.Scatter(
        x=sig['Time'],
        y=sig['e1'],
        mode='lines',
        name='ExplosionLine',
        line=dict(color='#A0522D', width=2)
    ), row=2, col=1)

    # Update layout for the entire figure
    fig_ssl.update_layout(
        title='Candlestick Chart with WAE',
        xaxis_title='Time',
        yaxis_title='Price',
        height=800,
        width=1200,
        margin=dict(l=50, r=50, b=50, t=100),
        paper_bgcolor="#1e1e1e",
        plot_bgcolor="#1e1e1e",
        font=dict(size=10, color="#e1e1e1"),
        showlegend=True,
        legend=dict(x=0, y=1.1, orientation='h')
    )

    # Show gridlines
    fig_ssl.update_xaxes(
        gridcolor="#1f292f",
        showgrid=True,
        fixedrange=True,
        rangeslider=dict(visible=True)
    )

    fig_ssl.update_yaxes(
        gridcolor="#1f292f",
        showgrid=True
    )

    return fig_ssl






In [19]:
df.reset_index(inplace=True)
df.rename(columns={'Date':'Time'},inplace=True)

In [22]:
df.rename(columns={'Date':'Time'},inplace=True)

In [33]:
create_figure_sig(sdf)

In [30]:
sdf = df.iloc[-100:]

In [38]:
entry_price = None
unit = 10
investment = []
total_unit = []

for index,row in df.iterrows():
    if row['FINAL_SIGNAL'] == "BUY":
        invest_price = unit * row['Open']
        investment.append(invest_price)
        total_unit.append(unit)

    


print(sum(investment))
print(sum(total_unit))


14191.940680742264
1110


In [51]:
c=df.iloc[-1:]['Close']

In [60]:
c.tolist()[0] * 1110

242579.39254760742

In [62]:
df = pd.read_csv(r"C:\Users\sachi\OneDrive\Documents\BOTS\nnfx_bot\yfinance_check\backtest_results.csv")

In [64]:
df.sort_values(by='Total Gain', ascending=False)

Unnamed: 0.1,Unnamed: 0,Total Gain,Positive Gains,Negative Gains,Win %,Total Trades,Total Investment
109,C,381.762366,56.0,55.0,50.450450,111.0,15647.835181
66,META,239.459963,13.0,12.0,52.000000,25.0,4877.029987
243,BA,209.364501,55.0,86.0,39.007092,141.0,7731.462886
10,AAPL,203.183347,39.0,30.0,56.521739,69.0,1601.079729
50,MSFT,176.340569,41.0,39.0,51.250000,80.0,4640.471995
...,...,...,...,...,...,...,...
149,BABA,-306.269978,7.0,14.0,33.333333,21.0,4691.329983
183,NVAX,-328.089936,19.0,42.0,31.147541,61.0,6265.499972
164,IOVA,-381.949996,12.0,30.0,28.571429,42.0,2141.569999
123,BXMT,-580.549935,39.0,156.0,20.000000,195.0,33123.999962
