# TimeGAN Sythetic Data Backtesting

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
from pathlib import Path

import numpy as np
import pandas as pd

from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import GRU, Dense
from tensorflow.keras.losses import BinaryCrossentropy, MeanSquaredError, MeanAbsoluteError
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import AUC
from tensorflow.keras.saving import load_model

import matplotlib.pyplot as plt
import seaborn as sns

import yfinance as yf

from backtesting import Backtest, Strategy
from backtesting.lib import crossover

In [None]:
gpu_devices = tf.config.experimental.list_physical_devices('GPU')
if gpu_devices:
    print('Using GPU')
    tf.config.experimental.set_memory_growth(gpu_devices[0], True)
else:
    print('Using CPU')

In [None]:
experiment = 0

In [None]:
path = Path('time_gan_research_variable')
hdf_store = path / 'TimeSeriesGAN.h5'
results_path = Path('time_gan_research_variable')
log_dir = results_path / f'experiment_{experiment:02}'

# Parameters

In [None]:
seq_len = 24 #window of time (days) that model uses to predict last (24th) day
n_seq = 30 #ticker count
batch_size = 128

start_date, end_date = '2000-01-01', '2022-12-01' #ticker price data yyyy-mm-dd
train_test_split = 0.8

In [None]:
scaler = MinMaxScaler()

In [None]:
#Pulled manually from
#https://money.cnn.com/magazines/fortune/fortune500_archive/full/2000/
tickers = ['GM', 'WMT', 'XOM', 'F', 'GE', 'IBM', 'C', 'T', 'MO', 'BA', 
           'BAC', 'HPQ', 'KR', 'STFGX', 'AIG', 'HD', 'PG', 'FNMA', 'CVX', 
           'MS', 'JPM', 'TGT', 'VZ', 'MRK', 'MSI', 'MCK', 'INTC', 'DD', 'JNJ', 'COST']

# Real Data Prep

In [None]:
def get_real_data():
    df = yf.download(tickers, start_date, end_date)['Adj Close']
    # Preprocess the dataset:
    scaled_data = scaler.fit_transform(df)

    data = []
    for i in range(len(df) - seq_len):
        data.append(scaled_data[i:i + seq_len])
    return data, df, df.index


real_data, yf_df, yf_index = get_real_data()


In [None]:
n_windows = len(real_data)

# Sythetic Data Prep

In [None]:
def make_random_data():
    while True:
        yield np.random.uniform(low=0, high=1, size=(seq_len, n_seq))

In [None]:
random_series = iter(tf.data.Dataset
                     .from_generator(make_random_data, output_types=tf.float32)
                     .batch(batch_size)
                     .repeat())

In [None]:
synthetic_data_keras = load_model(log_dir / "synthetic_data.keras")

In [None]:
print(synthetic_data_keras.summary())

In [None]:
#every time this is called, a new sythetic series is generated
def generate_data():
    generated_data = []
    for i in range(int(n_windows / batch_size)):
        Z_ = next(random_series)
        d = synthetic_data_keras(Z_)
        generated_data.append(d)
    generated_data = np.array(np.vstack(generated_data))
    #generated_data = (scaler.inverse_transform(generated_data.reshape(-1, n_seq)).reshape(-1, seq_len, n_seq))
    return generated_data

# Models

In [None]:
def get_model():
    model = Sequential([GRU(12, input_shape=(seq_len-1, n_seq)),
                        Dense(6)])

    model.compile(optimizer=Adam(),
                  loss=MeanAbsoluteError(name='MAE'))
    return model

In [None]:
#synthetic_data = generate_data()
#len(synthetic_data)

In [None]:
#Set indecies for train/test splits
real_data = np.array(real_data)[:5632] #Length of sythetic data

n_series = real_data.shape[0]

idx = np.arange(n_series)

n_train = int(train_test_split*n_series)
train_idx = idx[:n_train]
test_idx = idx[n_train:]

In [None]:
real_test_data = real_data[test_idx, :seq_len-1, :]
real_test_label = real_data[test_idx, -1, :]

In [None]:
def model_predictions(data, train_idx):
    data_train = data[train_idx, :seq_len-1, :]
    data_label = data[train_idx, -1, :]

    ts_regression = get_model()
    synthetic_result = ts_regression.fit(x=data_train,
                                        y=data_label,
                                        validation_data=(
                                            real_test_data, 
                                            real_test_label),
                                        epochs=100,
                                        batch_size=batch_size,
                                        verbose=0)
    test_predict_scaled = ts_regression.predict(real_test_data, verbose=0)
    test_predict = pd.DataFrame(scaler.inverse_transform(test_predict_scaled).squeeze())
    return test_predict

In [None]:
#test_predict = model_predictions(real_data, train_idx)

In [None]:
#Reverse real data back from scaler
real_unscaled = pd.DataFrame(scaler.inverse_transform(real_test_label).squeeze())

In [None]:
#Set index to dates
real_unscaled = real_unscaled.set_index(yf_index[test_idx])
#test_predict = test_predict.set_index(yf_index[test_idx])

In [None]:
def plot_preformance(real, predict, filename):
    ticker_count = len(tickers)
    fig, axes = plt.subplots(nrows=ticker_count//2, ncols=2, 
                             figsize=(int(ticker_count*14/6), int(ticker_count*7/6)))
    axes = axes.flatten()

    for j, ticker in enumerate(tickers):
        (pd.DataFrame({'Real': real.iloc[:, j],
                    'Synthetic Trained': predict.iloc[:, j]})
        .plot(ax=axes[j],
            title=ticker,
            secondary_y='Synthetic', style=['-', '--'],
            lw=1))
    sns.despine()
    fig.tight_layout()
    fig.savefig(filename)

#plot_preformance(real_unscaled, test_predict)

# Backtesting

In [None]:
#To Get our Backtestingpy strategy to work nice with different tickers
#We have to create a new class for each ticker
def ModelStrategy(ticker_series):
    class ModelStrategy_Inst(Strategy):
        percent_to_beat = 0.07

        def init(self):
            self.predict_iter = iter(ticker_series.to_numpy())
            self.price_today = next(self.predict_iter)

        def next(self):
            price_tomorrow = next(self.predict_iter)
            if price_tomorrow > self.price_today+self.price_today*self.percent_to_beat:
                self.buy()
            elif price_tomorrow < self.price_today-self.price_today*self.percent_to_beat:
                self.sell()
            self.price_today = price_tomorrow
    return ModelStrategy_Inst

In [None]:
#Backtesting On Each Stock 
def backtest(prediction):
    return_percents = [0]*len(tickers)
    stats_dict = {}
    bt_dict = {}
    for idx in range(len(tickers)):
        #YF redownload all values as needed by backtesting
        price_df = yf.download(tickers[idx], prediction.index[0], prediction.index[-1])
        #Initiate and optimize backtest based on best percent_to_beat value
        bt = Backtest(price_df, ModelStrategy(prediction[idx]), commission=0.002, exclusive_orders=True)
        stats_opt = bt.optimize(
            percent_to_beat=np.arange(0.01, 0.1, 0.01).tolist(),
            maximize="Equity Final [$]"
        )
        return_percents[idx] = stats_opt["Return [%]"]
        stats_dict.update({tickers[idx]: stats_opt})
        bt_dict.update({tickers[idx]: bt})
    return return_percents, stats_dict, bt_dict

#return_percents, stats_dict, bt_dict = backtest(test_predict)


# Real Looping

In [None]:
iterations = 50

In [None]:
best_real_model = None
best_real_return_avg = 0
best_real_test_predict = None
for k in range(iterations):
    print("Iteration: "+str(k))

    test_predict = model_predictions(real_data, train_idx)
    test_predict = test_predict.set_index(yf_index[test_idx])

    return_percents, stats_dict, bt_dict = backtest(test_predict)
    return_avg = np.average(return_percents)

    if best_real_model==None or best_real_return_avg<return_avg:
        best_real_model = return_percents, stats_dict, bt_dict
        best_real_return_avg = return_avg
        best_real_test_predict = test_predict
        print("New Best With Returns: "+str(best_real_model[0]))

In [None]:
plot_preformance(real_unscaled, best_real_test_predict, log_dir / "real_prediction_plot")

In [None]:
stats_df = pd.concat(stats_dict, axis=1)
stats_df

In [None]:
best_real_model[2]["GE"].plot(filename=str(log_dir / "plot_ge_real.html"))

# Synthetic Looping

In [None]:
best_syn_model = None
best_syn_return_avg = 0
best_syn_test_predict = None
for k in range(iterations):
    print("Iteration: "+str(k))
    generated_data = generate_data()

    test_predict2 = model_predictions(generated_data, train_idx)
    test_predict2 = test_predict2.set_index(yf_index[test_idx])

    return_percents, stats_dict, bt_dict = backtest(test_predict2)
    return_avg = np.average(return_percents)

    if best_syn_model==None or best_syn_return_avg<return_avg:
        best_syn_model = return_percents, stats_dict, bt_dict
        best_syn_return_avg = return_avg
        best_syn_test_predict = test_predict2
        print("New Best With Returns: "+str(best_syn_model[0]))

In [None]:
plot_preformance(real_unscaled, best_syn_test_predict, "syn_prediction_plot")

In [None]:
stats_syn_df = pd.concat(best_syn_model[1], axis=1)
stats_syn_df

In [None]:
best_real_model[2]["GE"].plot(filename=str(log_dir / "plot_ge_syn.html"))