In [88]:
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from itertools import combinations
import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.stattools import kpss
import warnings
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.sandbox.stats.runs import runstest_1samp
from scipy.stats import shapiro, anderson, kstest
import numpy as np
from statsmodels.stats.diagnostic import het_arch
import backtrader as bt

In [89]:
data = pd.read_csv('data/daily_close.csv', index_col=[0])
def get_headge_ratio(ticker1, ticker2, data):
    X = sm.add_constant(data[ticker1])
    y = data[ticker2]
    model = sm.OLS(y, X).fit()
    hedge_ratio = model.params[ticker1]

    return hedge_ratio #this is the aplha in our calculation


def spread(ticker1, ticker2, data, hedge_ratio):
    S2_bought = 100
    S1_bought = int(100*hedge_ratio)
    spread_v = 100*data[ticker2].iloc[-1] - int(100*hedge_ratio)*data[ticker1].iloc[-1]
    return spread_v, -1*S1_bought, S2_bought #this is the beta + epsilon per day

def plot_spread(ticker1, ticker2, data, hedge_ratio):
    t_values = range(0, len(data) - int(len(data)*0.7))
    spread_values_list = []
    dates = []

    for t in tqdm.tqdm(t_values):
        spread_values_list.append(spread(ticker1, ticker2, data[:int(len(data)*0.7) + t], hedge_ratio)[0])
        dates.append(data[:int(len(data)*0.7) + t].index[-1])
    
    plt.figure(figsize=(12, 6))
    plt.plot(dates, spread_values_list)
    plt.title(f'Spread between {ticker1} and {ticker2}')
    plt.xlabel('Date')
    plt.ylabel('Spread Value')
    plt.grid(True)
    plt.xticks(rotation=90)
    plt.xticks(dates[::len(dates)//50])
    plt.tight_layout()
    plt.show() #here we have made sure that we only plot the last 30% of the data for the spread
def get_polynomial_slope(spread_values_list, dates, closness):
    degree = min(int(len(dates) / closness) + 1, len(dates)-1)
    x = np.arange(len(dates))
    y = np.array(spread_values_list)
    
    coeffs = np.polyfit(x, y, degree)
    polynomial = np.poly1d(coeffs)
    
    slope = polynomial.deriv()(x[-1])    
    return slope #we get the last day's slope

def make_slope_curve(spread_values_list, dates, closness):
    slopes = []
    for i in range(10, len(spread_values_list)):
        slope = get_polynomial_slope(spread_values_list[0:i], dates[0:i], closness)
        slopes.append(slope)
    plt.figure(figsize=(12, 8))
    plt.plot(dates[10:], slopes)
    plt.xlabel('time')
    plt.ylabel('Slope Value')
    plt.grid(True)
    plt.xticks(rotation=90)
    plt.xticks(dates[::len(dates)//50])
    plt.tight_layout()
    plt.show() #plotting the slope curv for my method

def get_signals(spread_values_list, dates, closness, quantity_S1, quantity_S2):
    slopes = []
    signals = []
    order_book = []
    for i in range(10, len(spread_values_list)):
        slope = get_polynomial_slope(spread_values_list[0:i], dates[0:i], closness)
        if len(slopes) > 0:
            if slopes[-1] > 0 and slope < 0:
                signals.append((dates[i], "buy", spread_values_list[i]))
                order_book.append((dates[i], quantity_S1, quantity_S2))
            if slopes[-1] < 0 and slope > 0:
                signals.append((dates[i], "sell", spread_values_list[i]))
                order_book.append((dates[i], -1*quantity_S1, -1*quantity_S2))
        slopes.append(slope)
    return signals, order_book #getting signals, again I have not used future data in anyway

In [90]:
def general_function(ticker1, ticker2, data):
    hedge_ratio = get_headge_ratio(ticker1,ticker2, data[:int(len(data)*0.7)])
    _, quantity_S1, quantity_S2 = spread(ticker1,ticker2, data[:int(len(data)*0.7)], hedge_ratio)

    rev_hedge_ratio = get_headge_ratio(ticker2,ticker1, data[:int(len(data)*0.7)])
    _, r_quantity_S1, _ = spread(ticker2, ticker1, data[:int(len(data)*0.7)], rev_hedge_ratio)

    if r_quantity_S1 < quantity_S1:
        ticker1, ticker2 = ticker2, ticker1
        hedge_ratio = get_headge_ratio(ticker1,ticker2, data[:int(len(data)*0.7)])
        _, quantity_S1, quantity_S2 = spread(ticker1,ticker2, data[:int(len(data)*0.7)], hedge_ratio)
    #this is like our nomenclature, I just use (s1, s2) or (s2, s1) depending on headge ratio
    #I did this to ensure that (s1, s2) or (s2, s1) will get same answer

    t_values = range(0, len(data) - int(len(data)*0.7))
    spread_values_list = []
    dates = []

    for t in tqdm.tqdm(t_values):
        spread_values_list.append(spread(ticker1,ticker2, data[:int(len(data)*0.7) + t], hedge_ratio)[0])
        dates.append(data[:int(len(data)*0.7) + t].index[-1])
    
    signals, _ = get_signals(spread_values_list, dates, 30, quantity_S1, quantity_S2)
    return signals, spread_values_list, dates

In [91]:
signals, prices, dates = general_function('BAJFINANCE', 'BAJAJFINSV', data)

100%|██████████| 421/421 [00:00<00:00, 13303.91it/s]


In [92]:
df = pd.DataFrame({
    "datetime": pd.to_datetime(dates),
    "open": prices,
    "high": prices,
    "low": prices,
    "close": prices,
    "volume": [1000] * len(prices) #not going to use anyway, just to satisfy bt
})

df.set_index("datetime", inplace=True)

In [93]:
df

Unnamed: 0_level_0,open,high,low,close,volume
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2023-08-29 09:15:00,159527.46,159527.46,159527.46,159527.46,1000
2023-08-30 09:15:00,157741.51,157741.51,157741.51,157741.51,1000
2023-08-31 09:15:00,152006.90,152006.90,152006.90,152006.90,1000
2023-09-01 09:15:00,158064.51,158064.51,158064.51,158064.51,1000
2023-09-04 09:15:00,154697.10,154697.10,154697.10,154697.10,1000
...,...,...,...,...,...
2025-05-06 09:15:00,108181.80,108181.80,108181.80,108181.80,1000
2025-05-07 09:15:00,129166.30,129166.30,129166.30,129166.30,1000
2025-05-08 09:15:00,122976.00,122976.00,122976.00,122976.00,1000
2025-05-09 09:15:00,117470.00,117470.00,117470.00,117470.00,1000


In [94]:
df["signal"] = 0

signal_map = {
    "buy": 1,
    "sell": -1
}

for date, signal, price in signals:
    dt = pd.to_datetime(date)
    if dt in df.index:
        df.at[dt, "signal"] = signal_map[signal]

In [95]:
import backtrader as bt
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

class SignalStrategy(bt.SignalStrategy):
    def __init__(self):
        self.signal_add(bt.SIGNAL_LONG, self.data.signal)

class PandasSignalData(bt.feeds.PandasData):
    lines = ('signal',)
    params = (('signal', -1),)

data = PandasSignalData(dataname=df)

cerebro = bt.Cerebro()
cerebro.addstrategy(SignalStrategy)
cerebro.adddata(data)
cerebro.broker.set_cash(20000000)

cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='ta')
cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='timereturn')

results = cerebro.run()
strat = results[0]

dd = strat.analyzers.drawdown.get_analysis()
ta = strat.analyzers.ta.get_analysis()
returns = strat.analyzers.timereturn.get_analysis()

initial_cash = 20000000
final_value = cerebro.broker.getvalue()
total_return = (final_value - initial_cash) / initial_cash

daily_returns = pd.Series(returns)
average_return = daily_returns.mean()
sharpe_ratio = daily_returns.mean() / daily_returns.std() * np.sqrt(252) if daily_returns.std() != 0 else 0

total_trades = ta.total.closed if 'total' in ta and 'closed' in ta.total else 0
win_trades = ta.won.total if 'won' in ta else 0
win_rate = win_trades / total_trades if total_trades > 0 else 0

avg_holding = ta.len.average if 'len' in ta and 'average' in ta.len else 0

print("\nPerformance Metrics:")
print(f"Total Return        : ₹{total_return * initial_cash:.2f}")
print(f"Average Daily Return: ₹{average_return * initial_cash:.4f}")
print(f"Sharpe Ratio        : {sharpe_ratio:.2f}")
print(f"Max Drawdown        : ₹{(dd.max.drawdown/100)*initial_cash:.2f}")
print(f"Total Trades        : {total_trades}")
print(f"Win Rate            : {win_rate * 100:.2f}%")
print(f"Avg Holding Period  : {avg_holding:.2f} bars")

cumulative_returns = (1 + daily_returns).cumprod()
plt.figure(figsize=(12, 6))
plt.plot(cumulative_returns.index, cumulative_returns.values, linewidth=2, color='#2E86C1')
plt.title("Cumulative PnL", fontsize=14, pad=15)
plt.xlabel("Date", fontsize=12, labelpad=10)
plt.ylabel("Cumulative Returns", fontsize=12, labelpad=10)
plt.grid(True, linestyle='--', alpha=0.7)
plt.gca().spines['top'].set_visible(False)
plt.gca().spines['right'].set_visible(False)
plt.tight_layout()
plt.savefig('cumulative_returns.png', dpi=300, bbox_inches='tight')
plt.close()

cerebro.plot(style='line', barup='green', bardown='red', volume=False)
plt.savefig('cerebro.png')
plt.close()



Performance Metrics:
Total Return        : ₹98455.06
Average Daily Return: ₹234.0104
Sharpe Ratio        : 0.69
Max Drawdown        : ₹90521.64
Total Trades        : 10
Win Rate            : 70.00%
Avg Holding Period  : 21.10 bars


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [96]:
def general_function(ticker1, ticker2, data, com=0.0005, slipage=0.0001): #0.05% and 0.01%
    hedge_ratio = get_headge_ratio(ticker1,ticker2, data[:int(len(data)*0.7)])
    _, quantity_S1, quantity_S2 = spread(ticker1,ticker2, data[:int(len(data)*0.7)], hedge_ratio)

    rev_hedge_ratio = get_headge_ratio(ticker2,ticker1, data[:int(len(data)*0.7)])
    _, r_quantity_S1, _ = spread(ticker2, ticker1, data[:int(len(data)*0.7)], rev_hedge_ratio)

    if r_quantity_S1 < quantity_S1:
        ticker1, ticker2 = ticker2, ticker1
        hedge_ratio = get_headge_ratio(ticker1,ticker2, data[:int(len(data)*0.7)])
        _, quantity_S1, quantity_S2 = spread(ticker1,ticker2, data[:int(len(data)*0.7)], hedge_ratio)
    #this is like our nomenclature, I just use (s1, s2) or (s2, s1) depending on headge ratio
    #I did this to ensure that (s1, s2) or (s2, s1) will get same answer

    t_values = range(0, len(data) - int(len(data)*0.7))
    spread_values_list = []
    dates = []

    for t in tqdm.tqdm(t_values):
        spread_values_list.append(spread(ticker1,ticker2, data[:int(len(data)*0.7) + t], hedge_ratio)[0])
        dates.append(data[:int(len(data)*0.7) + t].index[-1])
    
    signals, _ = get_signals(spread_values_list, dates, 30, quantity_S1, quantity_S2)

    signals = signals
    prices = spread_values_list
    dates = dates

    max_cash = max(spread_values_list) + 100 #we add 100 for safe measure, it doesn nt really change anything

    df = pd.DataFrame({
        "datetime": pd.to_datetime(dates),
        "open": prices,
        "high": prices,
        "low": prices,
        "close": prices,
        "volume": [1000] * len(prices) #not going to use anyway, just to satisfy bt
    })

    df.set_index("datetime", inplace=True)
    df["signal"] = 0
    signal_map = {
        "buy": 1,
        "sell": -1
    }

    for date, signal, _ in signals:
        dt = pd.to_datetime(date)
        if dt in df.index:
            df.at[dt, "signal"] = signal_map[signal]

    class SignalStrategy(bt.SignalStrategy):
        def __init__(self):
            self.signal_add(bt.SIGNAL_LONG, self.data.signal)

    class PandasSignalData(bt.feeds.PandasData):
        lines = ('signal',)
        params = (('signal', -1),)

    data = PandasSignalData(dataname=df)

    cerebro = bt.Cerebro()
    cerebro.addstrategy(SignalStrategy)
    cerebro.adddata(data)
    cerebro.broker.set_cash(max_cash)

    cerebro.broker.setcommission(commission=com) 
    cerebro.broker.set_slippage_perc(slipage)


    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='ta')
    cerebro.addanalyzer(bt.analyzers.TimeReturn, _name='timereturn')

    results = cerebro.run()
    strat = results[0]

    dd = strat.analyzers.drawdown.get_analysis()
    ta = strat.analyzers.ta.get_analysis()
    returns = strat.analyzers.timereturn.get_analysis()

    initial_cash = max_cash
    final_value = cerebro.broker.getvalue()
    total_return = (final_value - initial_cash) / initial_cash

    daily_returns = pd.Series(returns)
    average_return = daily_returns.mean()
    sharpe_ratio = daily_returns.mean() / daily_returns.std() * np.sqrt(252) if daily_returns.std() != 0 else 0

    total_trades = ta.total.closed if 'total' in ta and 'closed' in ta.total else 0
    win_trades = ta.won.total if 'won' in ta else 0
    win_rate = win_trades / total_trades if total_trades > 0 else 0

    avg_holding = ta.len.average if 'len' in ta and 'average' in ta.len else 0

    performance_metrics = {
        "Total Return": f"₹{total_return * initial_cash:.2f}",
        "Average Daily Return": f"₹{average_return * initial_cash:.4f}",
        "Sharpe Ratio": f"{sharpe_ratio:.2f}",
        "Max Drawdown": f"₹{(dd.max.drawdown/100)*initial_cash:.2f}",
        "Total Trades": str(total_trades),
        "Win Rate": f"{win_rate * 100:.2f}%",
        "Avg Holding Period": f"{avg_holding:.2f} days"
    }

    cumulative_returns = (1 + daily_returns).cumprod()
    plt.figure(figsize=(12, 6))
    plt.plot(cumulative_returns.index, cumulative_returns.values, linewidth=2, color='#2E86C1')
    plt.title("Cumulative PnL", fontsize=14, pad=15)
    plt.xlabel("Date", fontsize=12, labelpad=10)
    plt.ylabel("Cumulative Returns", fontsize=12, labelpad=10)
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.gca().spines['top'].set_visible(False)
    plt.gca().spines['right'].set_visible(False)
    plt.tight_layout()
    plt.savefig('cumulative_returns.png', dpi=300, bbox_inches='tight')
    plt.close()

    cerebro.plot(style='line', barup='green', bardown='red', volume=False)
    plt.savefig('cerebro.png')
    plt.close()

    from IPython.display import Image, display

    display(Image('cumulative_returns.png'))
    display(Image('cerebro.png'))

    return performance_metrics



In [97]:
general_function('SBIN', 'SBILIFE', data, 0, 0)

TypeError: unsupported operand type(s) for +: 'int' and 'slice'