trying to reproduce this paper

1. https://arxiv.org/pdf/2112.03946
2. https://github.com/borisbanushev/stockpredictionai
3. https://onlinelibrary.wiley.com/doi/10.1155/2018/4907423


cannot reproduce this paper. problem is with phase space reconstruction and sliding window. idea is great however I'm not too sure about the results and also there might be data leakages. problem is with using sliding window. how am i supposed to predict testing data . lets say to predict day 1 of testing data we need the previous sliding window size which goes into the size of training data. also utilizing this method problem just gets related to GAN collpase since GANs are very senitive to input data.


In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf

import matplotlib.pyplot as plt

from tensorflow.keras.layers import Input, LSTM, Dense, Dropout, LeakyReLU, BatchNormalization, Reshape, Conv1D, Flatten
from tensorflow.keras.models import Sequential, load_model
from sklearn.preprocessing import StandardScaler

from tensorflow.keras.regularizers import l1
from tensorflow.keras.optimizers import Adam

from tensorflow.keras import backend as K



# nist represents the amount of stocks we care about
nist = 50

These following variables below are **hyperparameters** that we should tune after we have a decent model to start with.
1. learning rate for the discriminator
2. learning rate for the generator
3. lambda_p : forcast error loss
4. lambda_adv : adversarial loss
5. lambda_dpl : direction prediction loss
6. batch_size : batch size of the LSTM and CNN
7. cnn_lr: the learningrate of the CNN
8. strides: the number of strides in the CNN
9. lrelu_alpha: the alpha for the LeakyReLU in the GAN
10. batchnorm_momentum: momentum for the batch normalisation in the CNN
11. padding: the padding in the CNN
12. kernel_size':1: kernel size in the CNN
13. dropout: dropout in the LSTM
14. filters: the initial number of filters
15. window_size: the window size for the sliding window
16. epochs: the number of epochs for the training


In [None]:
# hyperparameters we need to tune for
m = 3
tau = 1

# learning rates
learning_rate_d = 0.0001
learning_rate_g = 0.0001

# coefficients of custom gan loss function
lambda_p = 1
lambda_adv = 1
lambda_dpl = 1

batch_size = 32
epochs = 500

# early stopping for testing. when certain of our model we should remove this
patience = 5
min_delta = 0.001



The data I am currently using is just closing stock prices of 50 different stocks with 500 trading days. It is stored in a column where each column represents a different stock.

In [None]:
data_load = np.loadtxt('prices.txt')

column_names = [f'stock_{i+1}' for i in range(nist)]
df = pd.DataFrame(data_load, columns=column_names)

rows = df.shape[0] - m + 1
cols = df.shape[1]

# print(rows)


https://github.com/manganganath/stock_price_trend_fft

the following fft code is copied from above. furthermore in the paper that I have tried to follow so hard, we use fft to denoise the stock data. All of this is done so that we can normalize the data. Against a lot of current research online, people have commonly used **MIN MAX** to nomralize their data. However I firmly believe that this is wrong. 1. the stock follows a random walk hypotheosis and it doesn't really make sense if we are putting a min max on a stock. we do not know if the stock is going to increase by a certain threshold for perpetuity. It might make sense if we are only looking at returns of a stock who has historically been very stable like banks or etf, but when we want to create a model to predict any stock movement we want to care about these high momentum stocks that could potentially have a lot of change in a short time peroid. These stocks commonly include biotech or tech companies.


Many other variation are

1. ARIMA
2. Wavelet
3. FFT

to denoise a stock data. However one paper suggested that these perform similar but we can test this later on.


In [None]:
stock_1 = df['stock_1']
close_fft = np.fft.fft(np.asarray(stock_1.tolist()))
fft_df = pd.DataFrame({'fft':close_fft})
fft_df['absolute'] = fft_df['fft'].apply(lambda x: np.abs(x))
fft_df['angle'] = fft_df['fft'].apply(lambda x: np.angle(x))
plt.figure(figsize=(14, 7), dpi=100)
fft_list = np.asarray(fft_df['fft'].tolist())
for num_ in [3, 6, 9, 100]:
  fft_list_m10 = np.copy(fft_list); fft_list_m10[num_:-num_]=0
  plt.plot(np.fft.ifft(fft_list_m10), label='Fourier transform with {} components'.format(num_))
plt.plot(stock_1,  label='Real')
plt.xlabel('Days')
plt.ylabel('USD')
plt.title('stock_1 prices & Fourier transforms')
plt.legend()
plt.show()

This is the second part of using phase transformation to normalise the data. In the paper it says that it "folds the data multiple time and the suggested window size is 131." However it did not mention the lag for the phase space reconstruction.

Therefore my interpretation of this work is that we use a window size and then we implement phase space reconstruction on each window size for all training data, and then after we predict we use the same window size and the same method to denormalize the data.

 It shall be noted that using phase space reconstruction is commonly used for determing "Embedding a time series means reconstructing (approximately) the phase space in which the system evolves in time; in other words, showing how 'all' the variables describing the system evolve in time. By 'all' here we mean more precisely 'the relevant ones'. https://academic.oup.com/book/36525/chapter-abstract/321310592?redirectedFrom=fulltext".

 We use it to try and identify "hidden" patterns

In [None]:
def embed_phase_space(data, m, tau):
    n = len(data)
    embedded_data = np.zeros((n - (m - 1) * tau, m))
    for i in range(m):
        embedded_data[:, i] = data[i * tau: n - (m - 1) * tau + i * tau]

    return embedded_data

def normalize_delay_vectors(embedded_data):
    norms = np.linalg.norm(embedded_data, axis=1)
    normalized_data = embedded_data / norms[:, np.newaxis]
    return normalized_data, norms

In [None]:
# normal_df_list = []

def nomralize_data(stock_no, m, tau):
    if isinstance(stock_no, (pd.DataFrame, pd.Series)):
        stock_no = stock_no.to_numpy()

    scaler = StandardScaler()
    stock_no_standardized = scaler.fit_transform(stock_no.reshape(-1, 1)).flatten()

    close_fft = np.fft.fft(stock_no_standardized)
    fft_df = pd.DataFrame({'fft': close_fft})
    fft_df['absolute'] = fft_df['fft'].apply(lambda x: np.abs(x))
    fft_df['angle'] = fft_df['fft'].apply(lambda x: np.angle(x))

    fft_list = np.asarray(fft_df['fft'].tolist())
    num_components = 100
    fft_list[num_components:-num_components] = 0

    reconstructed_signal = np.fft.ifft(fft_list)
    reconstructed_signal = reconstructed_signal.real

    embedded_data = embed_phase_space(reconstructed_signal, m, tau)

    normalized_data, norms = normalize_delay_vectors(embedded_data)

    # normal_df = pd.concat([normalized_data, norms, scaler, fft_list], axis=1)
    # normal_df.columns = [f'normalized_{j+1}' for j in range(m)] + ['norms', 'scaler', 'fft']
    # normal_df_list.append(normal_df)

    return normalized_data, norms, scaler, fft_list

In [None]:
def denormalize_data(normalized_data, norms, scaler, fft_list):
    denormalized_vectors = normalized_data * norms[:, np.newaxis]

    reconstructed_signal = np.fft.ifft(fft_list).real

    original_data = scaler.inverse_transform(reconstructed_signal.reshape(-1, 1)).flatten()

    return original_data

Look at phase space reconstruction on a piece of data

In [None]:
from mpl_toolkits.mplot3d import Axes3D

normalized_data, norms, scaler, fft_list = nomralize_data(df['stock_1'], 3, 1)

fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(111, projection='3d')
ax.plot(normalized_data[:, 0], normalized_data[:, 1], normalized_data[:, 2], lw=0.5)
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
plt.title('Phase Space Reconstruction')
plt.show()

print(normalized_data.shape)

# denormalized_data = denormalize_data(normalized_data, norms, scaler, fft_list)
# print(denormalized_data.shape)

In [None]:
def create_sliding_windows(data, window_size):
    windows = []
    for i in range(len(data) - window_size + 1):
        windows.append(data[i:i + window_size])
    return np.array(windows)


def create_data_vector(df, window_size):
    stock_data = []

    for i in range(nist):
        stock_series = df[f'stock_{i+1}'].values.flatten()
        sliding_windows = create_sliding_windows(stock_series, window_size)
        for window in sliding_windows:
            normalized_stock_data, norms, scaler, fft_list = nomralize_data(window, m, tau)
            stock_data.append(normalized_stock_data)

    return np.array(stock_data)

In [None]:
def adversarial_loss(y_true, y_pred):
    return tf.keras.losses.binary_crossentropy(y_true, y_pred)

def forecast_error_loss(y_true, y_pred):
    return tf.reduce_mean(tf.square(y_true - y_pred))

def direction_prediction_loss(y_true, y_pred):
    if y_true.shape[0] > 1:
        sign_true = tf.sign(y_true[1:] - y_true[:-1])
        sign_pred = tf.sign(y_pred[1:] - y_pred[:-1])
        return tf.reduce_mean(tf.abs(sign_true - sign_pred))
    else:
        return 0.0

def custom_gan_loss(y_true, y_pred):
    adv_loss = adversarial_loss(y_true, y_pred)
    p_loss = forecast_error_loss(y_true, y_pred)
    dpl_loss = direction_prediction_loss(y_true, y_pred)
    return lambda_adv * adv_loss + lambda_p * p_loss + lambda_dpl * dpl_loss


A lot of below follows https://github.com/soumith/ganhacks where I hoped this would of address the issue of GAN collpase

In [None]:
'''
  for our generator we are building an lstm model. right now it is a simple lstm but we can use this later.
  https://arxiv.org/pdf/1607.04381v1
  shape is 500, 5+m -> 1 which is the price of the stock
'''

def build_generator(input_shape):
    model = Sequential()
    model.add(LSTM(input_shape[0], input_shape=input_shape, return_sequences=True, kernel_regularizer=l1(0.01)))
    model.add(Dropout(0.2))
    model.add(Dense(1, activation='linear'))

  # print(model.summary())
    return model

# input_shape = (rows, 5 + m)
# print(input_shape)
# build_generator(input_shape)


In [None]:
'''
Sequential(
  (0): Conv1D(None -> 32, kernel_size=(5,), stride=(2,))
  (1): LeakyReLU(0.01)
  (2): Conv1D(None -> 64, kernel_size=(5,), stride=(2,))
  (3): LeakyReLU(0.01)
  (4): BatchNorm(axis=1, eps=1e-05, momentum=0.9, fix_gamma=False, use_global_stats=False, in_channels=None)
  (5): Conv1D(None -> 128, kernel_size=(5,), stride=(2,))
  (6): LeakyReLU(0.01)
  (7): BatchNorm(axis=1, eps=1e-05, momentum=0.9, fix_gamma=False, use_global_stats=False, in_channels=None)
  (8): Dense(None -> 220, linear)
  (9): BatchNorm(axis=1, eps=1e-05, momentum=0.9, fix_gamma=False, use_global_stats=False, in_channels=None)
  (10): LeakyReLU(0.01)
  (11): Dense(None -> 220, linear)
  (12): Activation(relu)
  (13): Dense(None -> 1, linear)
)

'''
def build_discriminator(input_shape):
    model = Sequential()
    model.add(Conv1D(32, kernel_size=5, strides=2, input_shape=input_shape, padding='same'))
    model.add(LeakyReLU(0.01))
    model.add(Conv1D(64, kernel_size=5, strides=2, padding='same'))
    model.add(LeakyReLU(0.01))
    model.add(BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.9))
    model.add(Conv1D(128, kernel_size=5, strides=2, padding='same'))
    model.add(LeakyReLU(0.01))
    model.add(BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.9))

    model.add(Flatten())
    model.add(Dense(220, activation='linear'))
    model.add(BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.9))
    model.add(LeakyReLU(0.01))
    model.add(Dense(220, activation='relu'))

    model.add(Dense(1, activation='linear'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

    # print(model.summary())
    return model

# input_shape = (rows, 5 + m)
# build_discriminator(input_shape)


In [None]:
def train_gan(generator, discriminator, data):
    optimizer_g = Adam(learning_rate=learning_rate_g)
    optimizer_d = Adam(learning_rate=learning_rate_d)
    best_loss = float('inf')
    epochs_no_improve = 0


    @tf.function
    def train_step(real_data):
        current_batch_size = tf.shape(real_data)[0]

        noise = tf.random.normal([current_batch_size, data.shape[1], data.shape[2]])

        with tf.GradientTape() as tape_g, tf.GradientTape() as tape_d:
            generated_data = generator(noise, training=True)

            real_output = discriminator(real_data, training=True)
            fake_output = discriminator(generated_data, training=True)

            # custom loss as referenced in the paper
            g_loss = custom_gan_loss(tf.ones_like(fake_output), fake_output)

            d_loss_real = tf.keras.losses.binary_crossentropy(tf.ones_like(real_output), real_output)
            d_loss_fake = tf.keras.losses.binary_crossentropy(tf.zeros_like(fake_output), fake_output)
            d_loss = d_loss_real + d_loss_fake

            g_loss = tf.reduce_mean(g_loss)
            d_loss = tf.reduce_mean(d_loss)

        gradients_of_generator = tape_g.gradient(g_loss, generator.trainable_variables)
        gradients_of_discriminator = tape_d.gradient(d_loss, discriminator.trainable_variables)

        optimizer_g.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
        optimizer_d.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

        return g_loss, d_loss



    for epoch in range(epochs):
        for batch in range(0, len(data), batch_size):
            end_index = batch + batch_size
            if end_index > len(data):
                end_index = len(data)
            real_data_batch = data[batch:end_index]
            g_loss, d_loss = train_step(real_data_batch)

        print(f'Epoch {epoch + 1}, Generator Loss: {g_loss}, Discriminator Loss: {d_loss}')

        total_loss = g_loss + d_loss
        if best_loss - total_loss > min_delta:
           best_loss = total_loss
           epochs_no_improve = 0
        else:
            epochs_no_improve += 1


        if epochs_no_improve >= patience:
            print(f'Early stopping has occured at epoch {epoch}')
            break

    generator.save('stock_generator.tf')
    discriminator.save('stock_discriminator.tf')

In [None]:

train_size = int(df.shape[0] * 0.8)
test_size = df.shape[0] - train_size
train_df = df.iloc[:train_size]
# test_data = df.iloc[train_size:]
train_data = create_data_vector(train_df, window_size)
# print(train_data.shape)
# 398 3
input_shape = (train_data.shape[1], train_data.shape[2])
generator = build_generator(input_shape)
discriminator = build_discriminator(input_shape)

train_gan(generator, discriminator, train_data)

In [None]:

def model_evaluate(generator, test_size, df, window_size):

    results = {}

    for x in range(nist):
        results[f'stock_{x+1}'] = {
            'Day': [],
            'Predicted Price': [],
            'Actual Price': [],
            'Absolute Difference': [],
            'Direction Match': []
        }


    for stock in range(1):

        for i in range(test_size):
            start = df.shape[0] - test_size + i - window_size
            end = df.shape[0] - test_size + i

            print(f'{start} - {end}')

            test_data_batch = df.iloc[start:end, stock]

            # print(test_data_batch.shape)

            normalized_data, norms, scaler, fft_list = nomralize_data(test_data_batch.values.flatten(), m, tau)
            # normalized_stock_df = pd.DataFrame(normalized_stock_data, columns=[f'normalized_{j+1}' for j in range(m)])
            normalized_data = normalized_data.reshape(1, normalized_data.shape[0], normalized_data.shape[1])

            # print(normalized_stock_data.shape)
            # normalized_stock_df = normalized_stock_df.values.reshape(1, window_size, 3)

            predicted = generator.predict(normalized_data)
            predicted_price = denormalize_data(predicted, norms[-1:], scaler, fft_list).flatten()[-1]


            actual_price = test_data_batch.values.flatten()[-1]
            # actual_prices = test_data_batch.values.flatten()
            # predicted_prices = denormalize_data(predicted, norms[-1:], scaler, fft_list).flatten()
            # print(predicted_prices)

            # print(actual_prices)
            # time_index = np.arange(len(actual_prices))

            # plt.figure(figsize=(10, 5))
            # plt.plot(time_index, actual_prices, label='Actual Prices', color='blue')
            # plt.plot(time_index, predicted_prices, label='Predicted Prices', color='red')
            # plt.title('Comparison of Actual and Predicted Prices')
            # plt.xlabel('Time Index')
            # plt.ylabel('Price')
            # plt.legend()
            # plt.grid(True)
            # plt.show()

            stock_key = f'stock_{stock + 1}'



            print(f'{start} to {end} - actual price: {actual_price} vs predicted price: {predicted_price}')
            difference = np.abs(predicted_price - actual_price)
            direction_match = False

            prev_price =  df.iloc[start - 1, stock]
            actual_change = (actual_price - prev_price) / prev_price
            predicted_change = (predicted_price - prev_price) / prev_price
            if (actual_change > 0 and predicted_change > 0) or (actual_change < 0 and predicted_change < 0):
                direction_match = True



            results[stock_key]['Day'].append(i + window_size)
            results[stock_key]['Predicted Price'].append(predicted_price)
            results[stock_key]['Actual Price'].append(actual_price)
            results[stock_key]['Absolute Difference'].append(difference)
            results[stock_key]['Direction Match'].append(direction_match)

    results_df = pd.DataFrame(results)
    results_df.to_csv('test_results.csv', index=False)
    print("Test results saved to 'test_results.csv'.")

generator = load_model('stock_generator.keras')
model_evaluate(generator, test_size, df, window_size)
# print(window_size)

# print(df.shape)



In [None]:

results_df = pd.read_csv('test_results.csv')

number = 1
stock_no_results = results_df[f'stock_{number}'].apply(eval).apply(pd.Series)

predicted_prices = stock_no_results.iloc[1]
actual_prices = stock_no_results.iloc[2]
days = range(len(predicted_prices))

plt.figure(figsize=(12, 6))
plt.plot(days, actual_prices, label='Actual Prices', color='blue', linestyle='-')
plt.plot(days, predicted_prices, label='Predicted Prices', color='red', linestyle='--')
plt.title('Comparison of Actual and Predicted Prices')
plt.xlabel('Evaluation Instance')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.show()