In [1]:
import pandas as pd
from itertools import product
import matplotlib.pyplot as plt
from datetime import datetime


db_file_path = 'db/ohlcv_ntickers_1254_2000-08-01_to_2023-12-23.pkl'

export_folder_path = './outputs/'
export_name = 'results'

fee = 0.002

# n_days_past_range = [1,2]
# n_days_future_range = [2]
# filter_lower_limit_range = [-100]
# filter_width_range = [50]
# loss_limit_range = [-0.5]

n_days_past_range = [1,2,3,4,5,7,10,12,15,17,20]
n_days_future_range = [1,2,3,4,5,10,15,20,25,30,35,40,45,50]
filter_lower_limit_range = [-125,-100,-80,-60,-50,-40,-35,-30,-25,-20,-15,-10,-5,0,5,10,20]
filter_width_range = [10,20,30,40,50,60,70]
loss_limit_range = [-1000, -100, -50, -20, -10, -5, -1, 0]

num_combinations = len(n_days_past_range) * len(n_days_future_range) * len(filter_lower_limit_range) * len(filter_width_range) * len(loss_limit_range)
print(num_combinations)

146608


In [2]:
def remove_top_column_name(df):
    return df.droplevel(0, axis=1)

data = pd.read_pickle(db_file_path)

open_data = remove_top_column_name(data[["Open"]])
low_data = remove_top_column_name(data [["Low"]])


In [3]:
def create_floor_mask(open_df, low_df, loss_limit, n_days_future):
    indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=n_days_future) # between current day and day before n_days_future
    future_min_values = low_df.rolling(window=indexer).min()
    loss_threshold = open_df * (1 + (loss_limit / 100))

    floor_mask = future_min_values < loss_threshold
    
    return floor_mask

def create_floored_future_var(open_df, low_df, loss_limit, n_days_future, future_var):
    floor_mask = create_floor_mask(open_df, low_df, loss_limit, n_days_future)
    floor_mask = floor_mask.stack(dropna=False)

    floored_future_var = future_var.copy()
    floored_future_var[floor_mask] = loss_limit

    return floored_future_var
    
def create_taxed_future_var(df, fee):
    taxed_profit = (1 + (df / 100)) * (1 - fee) / (1 + fee)
    taxed_var = (taxed_profit - 1) * 100
    
    return taxed_var


In [4]:
def get_year_column(df):
    return df.index.get_level_values(0).astype(str).str[:4]

def concat_dfs(dfs, column_names):    
    df = pd.concat(dfs, axis=1, keys=column_names)
    df = df.dropna()
    
    return df

def add_10_last_years_stats(df):
    df = df.sort_index(axis=1)

    medians_10_last_years = df.loc[:, 'median_2013':'median_2023']
    means_10_last_years = df.loc[:, 'mean_2013':'mean_2023']
    counts_10_last_years = df.loc[:, 'count_2013':'count_2023']

    df['median_median_10_last_years'] = medians_10_last_years.median(axis=1)
    df['mean_median_10_last_years'] = medians_10_last_years.mean(axis=1)
    df['min_median_10_last_years'] = medians_10_last_years.min(axis=1)

    df['median_mean_10_last_years'] = means_10_last_years.median(axis=1)
    df['mean_mean_10_last_years'] = means_10_last_years.mean(axis=1)
    df['min_mean_10_last_years'] = means_10_last_years.min(axis=1)

    
    df['median_count_10_last_years'] = counts_10_last_years.median(axis=1)
    df['mean_count_10_last_years'] = counts_10_last_years.mean(axis=1)
    df['min_count_10_last_years'] = counts_10_last_years.min(axis=1)

    return df

def replace_by_zeros(df, col_name_beginning):
    columns = [col for col in df.columns if col.startswith(col_name_beginning)]
    df[columns] = df[columns].fillna(0).replace('', 0)

    return df

In [5]:
def calculate_var(df, start_day, end_day):
    var = (df.shift(-end_day) - df.shift(-start_day)) / df.shift(-start_day) * 100
    
    return var.stack(dropna=False)

def calculate_filtered_var(df, col, lower_limit, width):
    return df[(df[col] >= lower_limit) & (df[col] <= lower_limit + width)]

def calculate_success_rate(df):
    positive_values_count = (df > 0).sum()
    values_count = len(df)

    return (positive_values_count / values_count) if values_count != 0 else float('nan')

def calculate_overall_results(df_col):
    return pd.DataFrame({
        'n_results': [len(df_col)],
        'median': [df_col.median()],
        'mean': [df_col.mean()],
        'success_rate': [calculate_success_rate(df_col)]
    })

def calculate_yearly_results(df, column):
    pivot_table = pd.pivot_table(
        df,
        values=[column],
        index=['year'],
        aggfunc={column: ['median', 'mean', 'min', 'max', 'count']})

    pivot_table = pivot_table.unstack().to_frame().sort_index(level=1).T
    pivot_table.columns = ['_'.join(col) for col in pivot_table.columns]
    pivot_table.columns = pivot_table.columns.str.replace(f'{column}_', '')
    pivot_table = pivot_table.reindex(sorted(pivot_table.columns), axis=1)

    return pivot_table

In [6]:
def save_results(df, export_name):
    def get_date():
        return datetime.today().strftime('%d-%m-%Y')

    date = get_date()

    export_file_path = f'{export_folder_path}{export_name}_{date}'

    print('Saving files...')
    
    df.to_csv(f'{export_file_path}.csv')
    df.to_excel(f'{export_file_path}.xlsx')

In [7]:
results = pd.DataFrame()
i = 0

for n_days_past, n_days_future in product(n_days_past_range, n_days_future_range):
    past_var = calculate_var(df=open_data, start_day=-n_days_past, end_day=0)
    future_var = calculate_var(df=open_data, start_day=0, end_day=n_days_future)

    for loss_limit in loss_limit_range:
        floored_future_var = create_floored_future_var(open_data, low_data, loss_limit, n_days_future, future_var)
        taxed_floored_future_var = create_taxed_future_var(floored_future_var, fee)
        
        var = concat_dfs(
            dfs=[past_var, future_var, taxed_floored_future_var],
            column_names=['past_var', 'future_var', 'taxed_floored_future_var'])
        
        var['year'] = get_year_column(var)

        for filter_lower_limit, filter_width in product(filter_lower_limit_range, filter_width_range):
            var_filtered = calculate_filtered_var(
                df=var, col='past_var',
                lower_limit=filter_lower_limit, width=filter_width)
            
            params = pd.DataFrame({'n_days_past': [n_days_past], 'n_days_future': [n_days_future],
                      'filter_lower_limit': [filter_lower_limit], 'filter_width': [filter_width],
                      'loss_limit': [loss_limit]})
            overall_results = calculate_overall_results(df_col=var_filtered['taxed_floored_future_var'])
            yearly_results = calculate_yearly_results(df=var_filtered, column='taxed_floored_future_var')
            new_results = pd.concat([params, overall_results, yearly_results], axis=1)

            results = pd.concat([results, new_results], ignore_index=True)

            i += 1
            print(f'\r step: {i}/{num_combinations}', end='')

results = add_10_last_years_stats(results)
results = replace_by_zeros(df=results, col_name_beginning='count')
results.sort_values('median_median_10_last_years', ascending=False, inplace=True)

save_results(results, export_name)

  return positive_values_count / len(df)


 step: 3/146608

  return positive_values_count / len(df)


 step: 58/146608

KeyboardInterrupt: 