In [1]:
import matplotlib.pyplot as plt
import numpy as np
import xlrd
from arch.bootstrap import SPA
import pandas as pd


list_of_period = [5,10,15,20,25,30]
list_of_k = [1,1.5,2]

In [2]:
# Read the data from the Excel file
xls_file = '/home/vishi/bolinger/cumul_ohlc.xls'
xls = xlrd.open_workbook(xls_file)
sheet = xls.sheet_by_index(0)
# Extract the data
closing_prices = []
for row in range(1, sheet.nrows):
    closing_prices.append(sheet.cell_value(row, 4))
    

In [3]:
def get_avg_std(prices, period):
    avg = np.zeros(len(prices))
    std = np.zeros(len(prices))
    for i in range(period, len(prices)):
        avg[i] = np.mean(prices[i-period:i])
        # sample std
        std[i] = np.std(prices[i-period:i], ddof=1)
    return avg, std


def get_bollinger_bands(prices, period, k):
    avg, std = get_avg_std(prices, period)
    upper_band = avg + k * std
    lower_band = avg - k * std
    return avg, upper_band, lower_band
def profit_basic(prices,avg, upper_band, lower_band):
    flag = 0
    profit = 0
    current_price = 0
    list_of_profit = []
    for i in range(len(prices)):
        if flag == 0:
            if prices[i] < lower_band[i]:
                flag = 1
                current_price = prices[i]
            elif prices[i] > upper_band[i]:
                flag = -1
                current_price = prices[i]
        elif flag == 1:
            if prices[i] > avg[i]:
                flag = 0
                profit += prices[i] - current_price
        elif flag == -1:
            if prices[i] < avg[i]:
                flag = 0
                profit += current_price - prices[i]
        list_of_profit.append(profit)
    if flag == 1:
        profit += prices[-1] - current_price
    elif flag == -1:
        profit += current_price - prices[-1]
    list_of_profit.append(profit)
    return list_of_profit

def plot_profit(list_of_profit, period, k):
    plt.plot(list_of_profit)
    plt.title(f'Profit with period {period} and k {k}')
    plt.xlabel('Time')
    plt.ylabel('Profit')
    plt.show()


def better_model_than_benchmark(i,list_of_all_returns_np):
    benchmark_returns = list_of_all_returns_np[:, i]
    # everything else
    stratergy_returns = np.delete(list_of_all_returns_np, i, axis=1)
    benchmark_losses = -benchmark_returns
    stratergy_losses = -stratergy_returns
    spa = SPA(benchmark=benchmark_losses,
        models=stratergy_losses,
        reps=1000,
        block_size=100,  
        bootstrap="stationary",
        studentize=True,
        nested=False,
        seed=42
    )
    spa.compute()
    return spa


In [None]:

##==============================================================================##

def Bollinger_Open_strat3 (mean:int, prev_mean:int, BB_up:int, BB_low:int, Hit:int = 0) -> int :
    '''
        Takes mean price of the stock at a particular minute and says if the stock has crossed the BB Bands or not \\
        If it has, it tells us if we should short, long or hold the stock 
        
        ---
        #### INPUT
        mean - mean/current value of the stock we want to trade \\
        prev_mean - mean of the stock in the previous minute \\
        BB_up - Upper Bollinger Band \\
        BB_low - Lower Bollinger Band \\
        Hit - 0 => Has not crossed the Bands, -1 => Crossed Upper Band before, 1 => Crossed lower band before
        ---
        #### OUTPUT
        If Hit = 0 \\
            Integer Value: 1 => crossed lower band; 0 => not crossed; -1 => crossed upper band \\
        If Hit = -1 or 1 \\
            Integer Value: 1 => Long; -1 => short; 0 => Do nothing
    '''       
    if Hit == 0:
        if mean < BB_low:
            signal = 1  # Hit Lower BB
        elif mean > BB_up:
            signal = -1  # Hit Upper BB
        else:
            signal = 0  # Did not Hit
    elif Hit == 1:
        if prev_mean < mean:
            signal = 1
        else: 
            signal = 0
    else:
        if prev_mean > mean:
            signal = -1
        else:
            signal = 0    
    
    return signal



def BollingerBaseline_Close (mean:int, open_price:int, BB_mid:int, Mode:str, stoploss:int = 100) -> list[int] :
    '''
        Takes current mean price and trade opening price and tells us if we should close the trade or not at the current stock price

        ---
        #### INPUT
        mean - mean/current value of the stock we want to trade \\
        open_price - the price at which we opened the trade \\
        BB_mid - Middle Bollinger Band (moving average) \\
        Mode - 'long' => we bought the stock before; 'short' => we sold the stock before \\
        lev - leverage \\
        stoploss - in percentage 

        ---
        #### OUTPUT
        List of 2 Integers: 
            List[0]: 1 => close/complete the trade; 0 => hold/do nothing
            List[1]: Net profit % 
    '''
 
    if Mode == 'long': # we bought stocks
        profit_per = (mean - open_price)/open_price * 100 # calculating PnL
        profit = (mean - open_price)
        if mean >= BB_mid: # closing when current value goes above middle band
            signal = 1 # signaling to close
            return_list = [signal, profit]
            return return_list
        else:
            if profit_per <= -stoploss : # closing at stoploss if stock falls too much
                signal = 1 # signaling to close
                return_list = [signal, profit]
                return return_list
            else:
                return [0, 0] # do nothing
            
    
    elif Mode == 'short': # we sold stocks
        profit_per = (open_price - mean)/open_price * 100 # calculating PnL
        profit = (mean - open_price)
        if mean <= BB_mid: # closing when current value goes below middle band
            signal = 1 # signaling to close
            return_list = [signal, profit]
            return return_list
        else:
            if profit_per <= -stoploss : # closing at stoploss if stock increases too much
                signal = 1 # signaling to close
                return_list = [signal, profit]
                return return_list
            else:
                return [0, 0] # do nothing
            
    else:
        print("Mode should have inputs as 'short' or 'long'; but someother input was given")
        return [0, 0]



def Bollinger_Running_strat3(path: str, period: int = 20, k: int = 2, output:bool = 'True') -> list[float]:
    '''
        Takes a string containing the path to a csv file that contains the data in minute-by-minute order. \\
        The data in the csv file should contain "open, high, low, close, Mean" in the respective order. \\
        It runs the BollingerBaseline algorithm on this data and returns the PnL. \\
        Assumptions - we can have only one entry before we close. And only one active position at a time. 
        
        ---
        #### INPUT
        path - a string containing the path to the csv file of stock data \\
        period - moving average for "period" minutes \\
        k - SD Multiplier (Bandwidth/2 or the distance between upper/lower band and middle band) \\
        lev - leverage \\
        stoploss - in percentage \\
        Entry_size - percentage of portfolio traded each time \\

        ---
        #### OUTPUT
        List - Per minute PnL
    '''
    try:
        # Read the CSV file
        data = pd.read_csv(path)

        # Check if the file is empty
        if data.empty:
            print("Error: The CSV file is empty.")
            return 0

        # Ensure required columns are present
        required_columns = ['open', 'high', 'low', 'close', 'Mean']
        if not set(required_columns).issubset(data.columns):
            print(f"Error: The CSV file must contain the following columns: {required_columns}")
            return 0

        # Calculate Bollinger Bands
        data['BB_mid'] = data['Mean'].rolling(window=period).mean()
        data['BB_std'] = data['Mean'].rolling(window=period).std()
        data['BB_up'] = data['BB_mid'] + (k * data['BB_std'])
        data['BB_low'] = data['BB_mid'] - (k * data['BB_std'])

        # Initialize variables
        position = None  # 'long' or 'short'
        Hit = 0
        open_price = 0
        total_profit = 0
        Per_min_returns = []
        signal = 0

        # Iterate through the data
        for i in range(period+1, len(data)):
            mean = data.loc[i, 'Mean']
            prev_mean = data.loc[i-1, 'Mean']
            BB_up = data.loc[i, 'BB_up']
            BB_low = data.loc[i, 'BB_low']
            BB_mid = data.loc[i, 'BB_mid']

            if position is None:  # No active position
                if Hit == 0:
                    Hit = Bollinger_Open_strat3(mean, prev_mean, BB_up, BB_low, Hit=0)  
                if Hit == 1:
                    signal = Bollinger_Open_strat3(mean, prev_mean, BB_up, BB_low, Hit=1)
                elif Hit == -1:
                    signal = Bollinger_Open_strat3(mean, prev_mean, BB_up, BB_low, Hit=-1)

                if signal == 1:  # Long entry
                    if not output: print(f"Open Signal: {signal}")  # Debugging: Print entry signal
                    position = 'long'
                    open_price = mean
                elif signal == -1:  # Short entry
                    if not output:print(f"Open Signal: {signal}")  # Debugging: Print entry signal
                    position = 'short'
                    open_price = mean

            elif position == 'long':  # Active long position
                close_signal, profit = BollingerBaseline_Close(mean, open_price, BB_mid, 'long')                
                if close_signal == 1:  # Close position
                    if not output: print(f"Close Signal: {close_signal}, Profit: {profit}")  # Debugging: Print close signal and profit
                    total_profit += profit
                    # Per_min_returns.append(profit) # appending per minute returns (in percentage)
                    position = None  # Reset position
                    signal = 0 # Reset signal

            elif position == 'short':  # Active short position
                close_signal, profit = BollingerBaseline_Close(mean, open_price, BB_mid, 'short')            
                if close_signal == 1:  # Close position
                    if not output: print(f"Close Signal: {close_signal}, Profit: {profit}")  # Debugging: Print close signal and profit
                    total_profit += profit
                    # Per_min_returns.append(profit) # appending per minute returns (in percentage)
                    position = None  # Reset position
                    signal = 0 # Reset signal
            
            Per_min_returns.append(total_profit)     
        
        return Per_min_returns  # Return net profit as a percentage of the initial portfolio and per minute returns

    except FileNotFoundError:
        print(f"Error: File at path '{path}' does not exist.")
        return 0
    except pd.errors.EmptyDataError:
        print("Error: The CSV file is empty or invalid.")
        return 0
    except Exception as e:
        print(f"Exception: {e}")
        return 0
##==============================================================================##

k_values = [1, 1.5, 2]  
period_values = [5, 10, 15, 20, 25, 30] 

# Path to the dataset
dataset_path = "Dataset.csv"

results = []
complete_Hstack = []

# Test all combinations of k and period
for k in k_values:
    for period in period_values[::-1]:
        per_minute_pnl = Bollinger_Running_strat3(dataset_path, period=period, k=k) 
        results.append((k, period, per_minute_pnl[-1]))  # Save the results
        complete_Hstack.append(np.array(per_minute_pnl).reshape(-1, 1))

min_length = min(arr.shape[0] for arr in complete_Hstack)
complete_Hstack_trimmed = [arr[-min_length:] for arr in complete_Hstack]
complete_Hstack = np.hstack(complete_Hstack_trimmed)

# print(results)

In [5]:
complete_Hstack.shape

(23594, 18)

In [10]:
# find the returns of every minute of every strategy
reshaped_list_of_profits = complete_Hstack.T
reshaped_list_of_profits.shape
list_of_returns = []
for i in range(len(reshaped_list_of_profits)):
    list_of_returns.append(np.diff(reshaped_list_of_profits[i]))



In [11]:

list_of_all_returns_np = np.array(list_of_returns).T

spa = better_model_than_benchmark(0, list_of_all_returns_np)
print("Results of the first strategy")
print()



Results of the first strategy



In [13]:
pvals = spa.pvalues
print("---- SPA Test Results ----")
print("Lower     p-value (liberal test):   ", round(pvals["lower"], 4))
print("Consistent p-value (recommended):   ", round(pvals["consistent"], 4))
print("Upper     p-value (conservative):   ", round(pvals["upper"], 4))

# Interpretation
if pvals["lower"] < 0.05:
    print("✅ At least one model is significantly better than the benchmark (at 5% level).")
else:
    print("❌ No model significantly outperforms the benchmark (at 5% level).")

# Which models are better?
better_model_indices = spa.better_models(pvalue=0.05, pvalue_type="lower")
print("Indices of better models (at 5% level):", better_model_indices)


---- SPA Test Results ----
Lower     p-value (liberal test):    0.441
Consistent p-value (recommended):    0.738
Upper     p-value (conservative):    0.894
❌ No model significantly outperforms the benchmark (at 5% level).
Indices of better models (at 5% level): []


In [14]:
for i in range(len(reshaped_list_of_profits)-1):
    spa = better_model_than_benchmark(i, list_of_all_returns_np)
    print(f"\nBetter models than benchmark (strategy {i}):")
    result = spa.better_models(pvalue=0.05, pvalue_type="consistent")
    # add 1 to the result whose index is more than i and do nothing for the rest
    result = [x + 1 if x > i else x for x in result]
    print(result)


        
        


Better models than benchmark (strategy 0):
[]

Better models than benchmark (strategy 1):
[]

Better models than benchmark (strategy 2):
[]

Better models than benchmark (strategy 3):
[]

Better models than benchmark (strategy 4):
[np.int64(0), np.int64(1), np.int64(2), np.int64(3), np.int64(6), np.int64(7), np.int64(8), np.int64(9), np.int64(10), np.int64(12), np.int64(13), np.int64(14), np.int64(15), np.int64(17)]

Better models than benchmark (strategy 5):
[np.int64(0), np.int64(1), np.int64(2), np.int64(3), np.int64(5), np.int64(7), np.int64(8), np.int64(9), np.int64(10), np.int64(12), np.int64(13), np.int64(14), np.int64(15)]

Better models than benchmark (strategy 6):
[]

Better models than benchmark (strategy 7):
[]

Better models than benchmark (strategy 8):
[]

Better models than benchmark (strategy 9):
[]

Better models than benchmark (strategy 10):
[]

Better models than benchmark (strategy 11):
[np.int64(0), np.int64(1), np.int64(2), np.int64(3), np.int64(6), np.int64(7), 

In [15]:
spa = better_model_than_benchmark(17, list_of_all_returns_np)
print(f"\nBetter models than benchmark (strategy 17):")
result = spa.better_models(pvalue=0.05, pvalue_type="consistent")
# add 1 to the result whose index is more than i and do nothing for the rest
result = [x + 1 if x > 17 else x for x in result]
print(result)


Better models than benchmark (strategy 17):
[np.int64(0), np.int64(1), np.int64(2), np.int64(3), np.int64(6), np.int64(7), np.int64(8), np.int64(9), np.int64(10), np.int64(12), np.int64(13), np.int64(14), np.int64(15)]


In [17]:
list_of_all_returns_np.shape
# add 0 as another strategy and make the dim 23593,18
list_of_all_returns_np = np.hstack((list_of_all_returns_np, np.zeros((list_of_all_returns_np.shape[0], 1))))
list_of_all_returns_np.shape

(23593, 19)

In [18]:
spa = better_model_than_benchmark(18, list_of_all_returns_np)
print(f"\nBetter models than benchmark (strategy 18):")
result = spa.better_models(pvalue=0.05, pvalue_type="consistent")
# add 1 to the result whose index is more than i and do nothing for the rest
result = [x + 1 if x > 18 else x for x in result]
print(result)


Better models than benchmark (strategy 18):
[np.int64(0), np.int64(1), np.int64(2), np.int64(3), np.int64(6), np.int64(7), np.int64(8), np.int64(9), np.int64(10), np.int64(12), np.int64(13), np.int64(14), np.int64(15)]
