<a href="https://colab.research.google.com/github/gohzhihwee/stuffs/blob/main/transformer_findata.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import os
import math
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.preprocessing import MinMaxScaler

In [None]:
def create_sequences(data, time_steps):
    X, y = [], []
    for x in range(len(data)-time_steps):
      X.append(data.iloc[x:x+time_steps,:].values)
      y.append(data.iloc[x+time_steps,:].values)
    return X, y

In [None]:
time_steps = 50
training_dataset = pd.read_csv("HKEX-06669.csv")
testing_dataset = pd.read_csv("HKEX-10276.csv")
training_data = training_dataset.iloc[:, [1,4,5,7,8,9,10,11]]
testing_data = testing_dataset.iloc[:, [1,4,5,7,8,9,10,11]]

def min_max_scaler(data):
    for col in data.columns:
      data[f'{col}'] = data[f'{col}']/data[f'{col}'].abs().max()
      data[f'{col}'] = data[f'{col}'].fillna(0)
    return data

training_data = min_max_scaler(training_data)
testing_data = min_max_scaler(testing_data)

print(training_data)
print(testing_data)


X_train, y_train = create_sequences(training_data, time_steps)
X_test, y_test = create_sequences(testing_data, time_steps)

X_train, y_train, X_test, y_test = np.array(X_train), np.array(y_train), np.array(X_test), np.array(y_test)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data[f'{col}'] = data[f'{col}']/data[f'{col}'].abs().max()
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data[f'{col}'] = data[f'{col}'].fillna(0)


     Nominal Price       Bid       Ask      High       Low  Previous Close  \
0         0.578362  0.578362  0.579053  0.578471  0.590526        0.577351   
1         0.577351  0.567240  0.575025  0.595573  0.591579        0.576340   
2         0.576340  0.566229  0.574018  0.582495  0.586316        0.570273   
3         0.570273  0.567240  0.568983  0.583501  0.589474        0.574317   
4         0.574317  0.566229  0.575025  0.578471  0.589474        0.561173   
..             ...       ...       ...       ...       ...             ...   
459       0.892821  0.892821  0.890232  0.899396  0.908421        0.915066   
460       0.915066  0.915066  0.916415  0.940644  0.952632        0.940344   
461       0.940344  0.930233  0.936556  0.973843  0.954737        0.953488   
462       0.953488  0.953488  0.950655  0.962777  0.905263        0.889788   
463       0.889788  0.871587  0.886203  0.924547  0.894737        0.000000   

     Share Volume (000)  Turnover (000)  
0              0.0059

In [None]:
## Implementation taken from Theodoros Ntakouris (can be found on the Keras website)
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(x, x)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    return x + res

In [None]:
def build_model(
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    dropout=0,
    mlp_dropout=0,
):
    inputs = keras.Input(shape=input_shape)
    x = inputs
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    outputs = x
    return keras.Model(inputs, outputs)

In [6]:
input_shape = X_train.shape[1:]

model = build_model(
    input_shape,
    head_size=256,
    num_heads=4,
    ff_dim=4,
    num_transformer_blocks=4,
    mlp_units=[128],
    mlp_dropout=0.4,
    dropout=0.25,
)

mse = keras.losses.MeanSquaredError()

model.compile(
    loss=mse,
    optimizer=keras.optimizers.Adam(learning_rate=1e-4),
    metrics=["mean_squared_error"],
)
model.summary()

callbacks = [keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)]

model.fit(
    X_train,
    X_train,
    validation_split=0.2,
    epochs=200,
    batch_size=64,
    callbacks=callbacks,
)

model.evaluate(X_test, X_test, verbose=1)

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 50, 8)]      0           []                               
                                                                                                  
 layer_normalization (LayerNorm  (None, 50, 8)       16          ['input_1[0][0]']                
 alization)                                                                                       
                                                                                                  
 multi_head_attention (MultiHea  (None, 50, 8)       35848       ['layer_normalization[0][0]',    
 dAttention)                                                      'layer_normalization[0][0]']    
                                                                                              

[0.03447715938091278, 0.03447715938091278]

In [7]:
import pickle as pkl
pkl.dump(model, open("trans_findata.pkl", "wb"))
