## Setup

In [None]:
%pip install matplotlib
%pip install yfinance
%pip install scipy
%pip install seaborn
%pip install pandas

In [None]:
import datetime as dt
import yfinance as yf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import scipy

In [None]:
start_date = dt.date(2010,1,1)
end_date = dt.date(2023, 12,31)

In [None]:
sp = yf.download("SPY", start=start_date, end=end_date)

## Highlighted stylized facts on the financial asset

In [None]:
spy_adj_close = sp['Adj Close']
spy_ret = np.log(spy_adj_close).diff()

In [None]:
# plot return series to show volatility clustering
spy_ret.plot()

In [None]:
# cdf graphs to show fat tail
ref_mean = spy_ret.mean()
ref_std = spy_ret.std()

norm_reference = np.random.normal(loc=ref_mean, scale=ref_std, size=spy_ret.shape[0])


In [None]:
plot_df = pd.DataFrame({'Normal Reference': norm_reference, 'SPY Historical Returns': spy_ret}).dropna()

In [None]:
plt.hist(plot_df['SPY Historical Returns'], label='SPY')
plt.hist(plot_df['Normal Reference'], label='Normal Ref')
plt.legend()
plt.show()

## Prepare the data for machine learning

### Fs conditions for Swing Trader Agent - Example


In [None]:
# temporarily we will use a fixed value 5 for the lookback days
shift_returns = spy_ret.shift(1) # make decision based on T-1 day to prevent lookahead bias
swing_return_lookback_days = 5
Fs = shift_returns - shift_returns.shift(swing_return_lookback_days)

# if Fs >0 D will follow like a momentum trader
# if Fs <0 D will revert to normal and go against the trend

### Noise Trader

In [None]:
noise_std = spy_ret.std()/ 2 # this parameters will be trained in the machine learning step

def one_noise_demand(std):
    return std * np.random.standard_normal()

demand_noise_sample = one_noise_demand(noise_std)
display(demand_noise_sample)

### Momentum Trader

In [None]:
ewma_alpha = 0.05 # fixed parameters, to be tuned in due course
ewm_sr = spy_ret.ewm(alpha=ewma_alpha, adjust=False).mean()

ewm_lambda = 4 # we will train the Beta parameters for the momentum trader demands, so we choose lambda = 1 for simplicity

momentum_beta = 0.5 # training parameters
def one_momentum_trader(beta, ewm_t):
    return beta * np.tanh(ewm_t)


demand_momentum_sample = one_momentum_trader(momentum_beta, ewm_lambda * ewm_sr.iloc[1])
display(demand_momentum_sample)


In [None]:
spy_ret

### Fundamental Trader

In [None]:
# we will use the bull-bear line (MA250) as a simple rule of thumb for the value of the asset
bb_lookback_days = 250
fundamental_v = 0.2/100 # training parameters

def one_fundamental_trader(v, pt, ma250):
    return v * (ma250 - pt)

spy_ma250 = spy_adj_close.rolling(bb_lookback_days).mean()
display(spy_ma250)

demand_fundamental_sample = one_fundamental_trader(fundamental_v, spy_adj_close.iloc[-50], spy_ma250.iloc[-50])
display(demand_fundamental_sample)

### Full input data

In [None]:
input_data = pd.DataFrame({
    'EWM': ewm_sr,
    'Close': spy_adj_close,
    'F_Value':  spy_ma250,
    'log_ret' :spy_ret,
}).dropna()
# basically we have use some lookback data (EWM/Fs/MA250) so our actual data for trainings starts from 2011
display(input_data)

## Parameters Calibration - Samples universe

In [None]:
# generate some random values for the parameters
from scipy.stats import qmc
sobol_sam = qmc.Sobol(d=3)
parameter_universe = sobol_sam.random_base2(m=18)

In [None]:
# sample simulation using a randomly selected params
import random
initialized_samples = random.choices(parameter_universe, k=10000)

### Illustration for 1 iteration of generating parameters to loss mappings

In [None]:
fundamental_v, momentum_beta, noise_std =initialized_samples[0]
fundamental_v, momentum_beta, noise_std 


In [None]:
# select the time period of the input data
# input_data = input_data[:'2015']

In [None]:
display(input_data.head())
display(input_data.tail())


In [None]:
# copy trader functions from above
def one_fundamental_trader(v, pt, ma250):
    return v * (ma250 - pt)

def one_momentum_trader(beta, ewm_t, ewm_lambda = 4):
    return beta * np.tanh(ewm_lambda * ewm_t)

def one_noise_demand(std):
    return std * np.random.standard_normal()

# start one round of simulation with the selected params
def simulate_price_series():
    p_cur = input_data.iloc[0]['Close']
    ewm_cur = input_data.iloc[0]['EWM']
    ewm_alpha = 0.05
    yield input_data.iloc[0].name, p_cur
    price_series = [p_cur]
    for date, _, _, f_value, _ in input_data[1:].itertuples():
        
        demand = one_fundamental_trader(fundamental_v, p_cur, f_value)+ one_noise_demand(noise_std)
        if len(price_series)> 2:
            ewm_next = (1-ewm_alpha) * ewm_cur + ewm_alpha * (np.log(price_series[-1]) - np.log(price_series[-2]))
            demand += one_momentum_trader(momentum_beta, ewm_next)
            ewm_cur = ewm_next 
        if len(price_series) >5:
            fs = np.log(price_series[-1]) - np.log(price_series[-1-5])
            if fs >0:
                demand += one_momentum_trader(momentum_beta, ewm_next)
            else:
                demand += one_fundamental_trader(fundamental_v, p_cur, f_value)
        p_next = p_cur+demand
        yield date, p_next
        price_series.append(p_next)
        p_cur = p_next
        

sr = pd.Series(dict(simulate_price_series()))
sim_return_sr = np.log(sr).diff()

### Calculate the loss function

In [None]:
def absolute_hill_estimator(ret_sr, threshold = 100):
    cleaned_sr = ret_sr.copy().replace(0, np.nan).dropna().abs() # ignore 0 ret days and take absolute
    ysort = np.sort(cleaned_sr)
    iota = 1/(np.mean(np.log(ysort[0:threshold]/ysort[threshold]))) # get the tail index
    return iota

In [None]:
hist_ret_sr = input_data.log_ret[1:]
historical_hill = absolute_hill_estimator(hist_ret_sr)
sim_hill = absolute_hill_estimator(sim_return_sr)
d_hill = abs(sim_hill - historical_hill)

In [None]:
sim_vol = sim_return_sr.std()
hist_vol = hist_ret_sr.std()
d_vol = abs(sim_vol - hist_vol)

In [None]:
lags = [1,2,3, 10, 11,12, 30, 31, 32]
autocorr_diff_sum = sum([abs(sim_return_sr.autocorr(l) - hist_ret_sr.autocorr(l)) for l in lags])
d_auto_corr = autocorr_diff_sum/ len(lags)

In [None]:
sqaured_auto_corr_diff_sum = sum([abs(np.square(sim_return_sr).autocorr(l) - np.square(hist_ret_sr).autocorr(l)) for l in lags])
d_auto_corr_squared = sqaured_auto_corr_diff_sum/ len(lags)

In [None]:
loss = d_hill + d_vol + d_auto_corr + d_auto_corr_squared

In [None]:
def distance_loss_function(hist_ret, sim_ret):
    historical_hill = absolute_hill_estimator(hist_ret)
    sim_hill = absolute_hill_estimator(sim_ret)
    d_hill = abs(sim_hill - historical_hill)

    sim_vol = sim_ret.std()
    hist_vol = hist_ret.std()
    d_vol = abs(sim_vol - hist_vol)

    lags = [1,2,3, 10, 11,12, 30, 31, 32]
    autocorr_diff_sum = sum([abs(sim_ret.autocorr(l) - hist_ret.autocorr(l)) for l in lags])
    d_auto_corr = autocorr_diff_sum/ len(lags)

    sqaured_auto_corr_diff_sum = sum([abs(np.square(sim_ret).autocorr(l) - np.square(hist_ret).autocorr(l)) for l in lags])
    d_auto_corr_squared = sqaured_auto_corr_diff_sum/ len(lags)
    return d_hill + d_vol + d_auto_corr + d_auto_corr_squared

In [None]:
surrogate_training_data = pd.DataFrame({'fundamental_v': fundamental_v, 'momentum_beta': momentum_beta, 'noise_std':noise_std , 'loss': loss}, index=[0])
surrogate_training_data

This shows how 1 iteration of params => loss will be generated for training the surrogate model, now we will create 10_000 points

### Generating all mappings

In [None]:
hist_ret_sr = input_data.log_ret[1:]
training_set_1 = {(v,beta,std) for v,beta,std in initialized_samples}
def generate_trainings(train_data):
    for idx, (fundamental_v, momentum_beta, noise_std) in enumerate(train_data):
        def simulate_price_series():
            p_cur = input_data.iloc[0]['Close']
            ewm_cur = input_data.iloc[0]['EWM']
            ewm_alpha = 0.05
            yield input_data.iloc[0].name, p_cur
            price_series = [p_cur]
            for date, _, _, f_value, _ in input_data[1:].itertuples():
                demand = one_fundamental_trader(fundamental_v, p_cur, f_value) + one_noise_demand(noise_std)
                if len(price_series)> 2:
                    ewm_next = (1-ewm_alpha) * ewm_cur + ewm_alpha * (np.log(price_series[-1]) - np.log(price_series[-2]))
                    demand += one_momentum_trader(momentum_beta, ewm_next) 
                    ewm_cur = ewm_next
                if len(price_series) >5:
                    fs = np.log(price_series[-1]) - np.log(price_series[-1-5])
                    if fs >0:
                        demand += one_momentum_trader(momentum_beta, ewm_next)
                    else:
                        demand += one_fundamental_trader(fundamental_v, p_cur, f_value)
                p_next = p_cur+demand
                yield date, p_next
                price_series.append(p_next)
                p_cur = p_next
        
        sr = pd.Series(dict(simulate_price_series()))
        sim_return_sr = np.log(sr).diff()
        loss = distance_loss_function(hist_ret_sr, sim_return_sr)
        print('.', end='\n' if idx %400 ==0 and idx >0 else '')
        yield fundamental_v,   momentum_beta,  noise_std ,   loss

        

In [None]:
labelled_data_1 = list(generate_trainings(training_set_1))

In [None]:
xgboost_train_data = pd.DataFrame(labelled_data_1, columns=['fundamental_v',   'momentum_beta',  'noise_std' ,   'loss'])

In [None]:
# Optionally save the initial train data to reduce experiment time
# xgboost_train_data.to_csv('xg_train_data.csv')

## Train XGB for efficient loss estimation without actually running the demand model

In [None]:
import xgboost as xgb
import pandas as pd
# xgboost_train_data = pd.read_csv('xg_train_data.csv',index_col=0)

In [None]:
xgboost_train_data['loss'] = xgboost_train_data['loss'].clip(upper=1)
xgboost_train_data

In [None]:
xgb_regressor = xgb.XGBRegressor(tree_method="hist")

In [None]:
xgb_regressor.fit(xgboost_train_data[['fundamental_v', 'momentum_beta', 'noise_std']], xgboost_train_data[['loss']])

In [None]:
# predict_loss from initialized_samples
universe_set = {(a,b,c) for a,b,c in parameter_universe}
remaining_set = universe_set - training_set_1
out = xgb_regressor.predict(list(remaining_set))

In [None]:
predict_df = pd.DataFrame(remaining_set, columns = ['fundamental_v', 'momentum_beta', 'noise_std'])
predict_df['predict_dloss'] = out
predict_df

In [None]:
predict_df['predict_dloss'].describe()

### Incrementally add new labelled data by taking the parameter set with the lowest predict_dloss

In [None]:
# we will get the lowest 1500 predicted params + 500 randomly selected params for further simulating, and then to fit xgboost again
predict_select = 1500
random_select = 500

next_training_df = predict_df.sort_values('predict_dloss').iloc[:predict_select][['fundamental_v','momentum_beta','noise_std']]
random_df = predict_df.iloc[predict_select:].sample(n=random_select, random_state=1)[['fundamental_v','momentum_beta','noise_std']]

In [None]:
next_training_set = set(pd.concat([next_training_df, random_df]).itertuples(index=False, name=None))
labelled_data = list(generate_trainings(next_training_set))
xgboost_train_data_supp = pd.DataFrame(labelled_data, columns=['fundamental_v',   'momentum_beta',  'noise_std' ,   'loss'])
all_xgboost_train_data = pd.concat([xgboost_train_data, xgboost_train_data_supp])
all_xgboost_train_data['loss'] = all_xgboost_train_data['loss'].clip(upper=1)

In [None]:
xgb_regressor.fit(all_xgboost_train_data[['fundamental_v', 'momentum_beta', 'noise_std']], all_xgboost_train_data[['loss']])
all_training_set = training_set_1 | next_training_set
remaining_set = universe_set - all_training_set
out = xgb_regressor.predict(list(remaining_set))
predict_df = pd.DataFrame(remaining_set, columns = ['fundamental_v', 'momentum_beta', 'noise_std'])
predict_df['predict_dloss'] = out
predict_df

In [None]:
predict_df['predict_dloss'].describe()

In [None]:
# check the predict error now:
check_output = xgb_regressor.predict(list(all_xgboost_train_data[['fundamental_v', 'momentum_beta', 'noise_std']].itertuples(index=False, name=None)))
predict_with_true = all_xgboost_train_data.copy()
predict_with_true['predict'] = check_output
avg_loss = ((predict_with_true['predict'] - predict_with_true['loss'])/predict_with_true['loss']).mean()
print(f'Avg loss after supplemented training {avg_loss:.4f}')

In [None]:
# here we will repeat the extra training 5 more times

def re_train(predict_df, all_train_data, training_set):
    next_training_df = predict_df.sort_values('predict_dloss').iloc[:predict_select][['fundamental_v','momentum_beta','noise_std']]
    random_df = predict_df.iloc[predict_select:].sample(n=random_select, random_state=1)[['fundamental_v','momentum_beta','noise_std']]
    next_training_set = set(pd.concat([next_training_df, random_df]).itertuples(index=False, name=None))
    labelled_data = list(generate_trainings(next_training_set))
    xgboost_train_data_supp = pd.DataFrame(labelled_data, columns=['fundamental_v',   'momentum_beta',  'noise_std' ,   'loss'])
    all_xgboost_train_data = pd.concat([all_train_data, xgboost_train_data_supp])
    all_xgboost_train_data['loss'] = all_xgboost_train_data['loss'].clip(upper=1)


    xgb_regressor.fit(all_xgboost_train_data[['fundamental_v', 'momentum_beta', 'noise_std']], all_xgboost_train_data[['loss']])
    all_training_set = training_set | next_training_set
    remaining_set = universe_set - all_training_set
    out = xgb_regressor.predict(list(remaining_set))
    predict_df = pd.DataFrame(remaining_set, columns = ['fundamental_v', 'momentum_beta', 'noise_std'])
    predict_df['predict_dloss'] = out
    
    # check the predict error now:
    check_output = xgb_regressor.predict(list(all_xgboost_train_data[['fundamental_v', 'momentum_beta', 'noise_std']].itertuples(index=False, name=None)))
    predict_with_true = all_xgboost_train_data.copy()
    predict_with_true['predict'] = check_output
    avg_loss = ((predict_with_true['predict'] - predict_with_true['loss'])/predict_with_true['loss']).mean()
    print(f'\nAvg loss after re-training {avg_loss:.4f}')
    return predict_df, all_xgboost_train_data, all_training_set




for i in range(5):
    print(f'Re-train XGB repeating the steps, loop ={i+1}')
    predict_df,all_xgboost_train_data, all_training_set= re_train(predict_df,all_xgboost_train_data, all_training_set)


In [None]:
final_model = xgb_regressor.fit(all_xgboost_train_data[['fundamental_v', 'momentum_beta', 'noise_std']], all_xgboost_train_data[['loss']])

final_loss_pred = final_model.predict(parameter_universe)
result_df = pd.DataFrame(parameter_universe, columns=['fundamental_v', 'momentum_beta', 'noise_std'])
result_df['predict_dloss'] = final_loss_pred
result_df.sort_values('predict_dloss')

In [None]:
# select the 20 lowest predicted dloss and get the one with the lowest true loss
selected_df = result_df.sort_values('predict_dloss').iloc[:20][['fundamental_v','momentum_beta','noise_std']]
selected_labelled_data = list(generate_trainings(set(selected_df.itertuples(index=False, name=None))))
final_result_df =pd.DataFrame(selected_labelled_data, columns=['fundamental_v',   'momentum_beta',  'noise_std' ,   'loss'])
final_result_df

In [None]:
final_v,final_beta,final_std = final_result_df.sort_values('loss').iloc[0][['fundamental_v', 'momentum_beta', 'noise_std']]
final_v,final_beta,final_std

In [None]:
# now we use this final params into our ABM model, and see the series produced

def simulate_price_series_final(v, beta, std, input_data):
    p_cur = input_data.iloc[0]['Close']
    ewm_cur = input_data.iloc[0]['EWM']
    ewm_alpha = 0.05
    yield input_data.iloc[0].name, p_cur
    price_series = [p_cur]
    for date, _, _, f_value, _ in input_data[1:].itertuples():
        demand = one_fundamental_trader(v, p_cur, f_value) +  one_noise_demand(std)
        if len(price_series)> 2:
            ewm_next = (1-ewm_alpha) * ewm_cur + ewm_alpha * (np.log(price_series[-1]) - np.log(price_series[-2]))
            demand += one_momentum_trader(momentum_beta, ewm_next)         
            ewm_cur = ewm_next
        if len(price_series) >5:
            fs = np.log(price_series[-1]) - np.log(price_series[-1-5])
            if fs >0:
                demand += one_momentum_trader(beta, ewm_next)
            else:
                demand += one_fundamental_trader(v, p_cur, f_value)
        p_next = p_cur+demand
        yield date, p_next
        price_series.append(p_next)
        p_cur = p_next
        

sim_sr_final = pd.Series(dict(simulate_price_series_final(final_v,final_beta,final_std, input_data)))
sim_ret_sr_final = np.log(sr).diff()


plot_df = pd.DataFrame({'Hist Price': input_data['Close'], 'Sim Price': sim_sr_final})
plot_df.plot()

In [None]:
sim_ret_sr_final.kurt()

In [None]:
sim_ret_sr_final.describe(), hist_ret_sr.describe()

In [None]:
hist_ret_sr.kurt()

In [None]:
from statsmodels.graphics.tsaplots import plot_acf
import statsmodels.api as sm

In [None]:
import matplotlib.pyplot as plt
sm.graphics.tsa.plot_acf(hist_ret_sr.values.squeeze(),lags=50 ,title='Historical Returns Autocorrelation')
sm.graphics.tsa.plot_acf(np.square(hist_ret_sr).values.squeeze(),lags=50 ,title='Squared Historical Returns Autocorrelation')

sm.graphics.tsa.plot_acf(sim_ret_sr_final.dropna().values.squeeze(),lags=50 ,title='Simulated Returns Autocorrelation')
sm.graphics.tsa.plot_acf(np.square(sim_ret_sr_final.dropna()).values.squeeze(),lags=50 ,title='Squared Simulated Returns Autocorrelation')
plt.show()