In [1]:
import sys
import warnings

if not sys.warnoptions:
    warnings.simplefilter('ignore')

In [2]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from datetime import datetime
from datetime import timedelta
from tqdm import tqdm
sns.set()
tf.compat.v1.random.set_random_seed(1234)

2024-06-10 23:14:42.710140: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-06-10 23:14:42.710398: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-06-10 23:14:42.712810: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-06-10 23:14:42.743165: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
df = pd.read_csv('../dataset/eurusd_4h/EURUSD_Candlestick_4_Hour_BID_01.01.2023-31.12.2023.csv')
df.head()

Unnamed: 0,Gmt time,Open,High,Low,Close,Volume
0,31.12.2022 22:00:00.000,1.07026,1.07026,1.07026,1.07026,0.0
1,01.01.2023 02:00:00.000,1.07026,1.07026,1.07026,1.07026,0.0
2,01.01.2023 06:00:00.000,1.07026,1.07026,1.07026,1.07026,0.0
3,01.01.2023 10:00:00.000,1.07026,1.07026,1.07026,1.07026,0.0
4,01.01.2023 14:00:00.000,1.07026,1.07026,1.07026,1.07026,0.0


In [4]:
minmax = MinMaxScaler().fit(df.iloc[:, 4:5].astype('float32')) # Close index
df_log = minmax.transform(df.iloc[:, 4:5].astype('float32')) # Close index
df_log = pd.DataFrame(df_log)
df_log.head()


Unnamed: 0,0
0,0.304447
1,0.304447
2,0.304447
3,0.304447
4,0.304447


## Split train and test

I will cut the dataset to train and test datasets,

1. Train dataset derived from starting timestamp until last 30 days
2. Test dataset derived from last 30 days until end of the dataset

So we will let the model do forecasting based on last 30 days, and we will going to repeat the experiment for 10 times. You can increase it locally if you want, and tuning parameters will help you by a lot.

In [5]:
test_size = 30
simulation_size = 10

df_train = df_log.iloc[:-test_size]
df_test = df_log.iloc[-test_size:]
df.shape, df_train.shape, df_test.shape

((2192, 6), (2162, 1), (30, 1))

In [6]:

class Model:
    def __init__(self, learning_rate, num_layers, size, size_layer, output_size, dropout_rate=0.1):
        # Define the LSTM cells with dropout
        def lstm_cell(size_layer):
            return tf.keras.layers.LSTMCell(size_layer, dropout=dropout_rate, recurrent_dropout=dropout_rate)
        
        rnn_cells = [lstm_cell(size_layer) for _ in range(num_layers)]
        
        # Stack the RNN cells
        self.stacked_rnn_cells = tf.keras.layers.StackedRNNCells(rnn_cells)
        self.rnn_layer = tf.keras.layers.RNN(self.stacked_rnn_cells, return_sequences=True, return_state=True)
        
        # Define input layers
        self.inputs = tf.keras.Input(shape=(None, size))
        self.hidden_state_input = [tf.keras.Input(shape=(size_layer,)) for _ in range(num_layers)]
        self.cell_state_input = [tf.keras.Input(shape=(size_layer,)) for _ in range(num_layers)]
        self.hidden_states = list(zip(self.hidden_state_input, self.cell_state_input))
        
        # RNN layer output
        rnn_outputs = self.rnn_layer(self.inputs, initial_state=self.hidden_states)
        
        # Extract the last state and output
        rnn_output = rnn_outputs[0]
        rnn_states = rnn_outputs[1:]
        
        # Dense layer for output
        self.outputs = tf.keras.layers.Dense(output_size)(rnn_output)
        
        # Define the model
        self.model = tf.keras.Model(inputs=[self.inputs] + self.hidden_state_input + self.cell_state_input, outputs=[self.outputs] + [item for sublist in rnn_states for item in sublist])


        # Compile the model
        self.model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate), loss='mse')

    def train_on_batch(self, X_batch, Y_batch, initial_states):
        input_data = [X_batch] + initial_states
        return self.model.train_on_batch(input_data, Y_batch)



    def predict(self, X_batch, initial_states):
        return self.model.predict([X_batch] + initial_states)

def calculate_accuracy(real, predict):
    real = np.array(real) + 1
    predict = np.array(predict) + 1
    percentage = 1 - np.sqrt(np.mean(np.square((real - predict) / real)))
    return percentage * 100

def anchor(signal, weight):
    buffer = []
    last = signal[0]
    for i in signal:
        smoothed_val = last * weight + (1 - weight) * i
        buffer.append(smoothed_val)
        last = smoothed_val
    return buffer


In [7]:
num_layers = 1
size_layer = 128
timestamp = 5
epoch = 300
dropout_rate = 0.8
future_day = test_size
learning_rate = 0.01

In [8]:

def forecast():
    model = Model(learning_rate, num_layers, df_log.shape[1], size_layer, df_log.shape[1], dropout_rate)
    date_ori = pd.to_datetime(df.iloc[:, 0]).tolist()

    pbar = tqdm(range(epoch), desc='train loop')
    for i in pbar:
        init_value = [np.zeros((1, size_layer))] * num_layers
        total_loss, total_acc = [], []
        for k in range(0, df_train.shape[0] - 1, timestamp):
            index = min(k + timestamp, df_train.shape[0] - 1)
            batch_x = np.expand_dims(df_train.iloc[k:index, :].values, axis=0)
            batch_y = df_train.iloc[k + 1:index + 1, :].values
            logits, last_state, _, loss = model.train_on_batch(batch_x, batch_y, init_value)
            init_value = last_state
            total_loss.append(loss)
            total_acc.append(calculate_accuracy(batch_y[:, 0], logits[:, 0]))
        pbar.set_postfix(cost=np.mean(total_loss), acc=np.mean(total_acc))

    future_day = test_size

    output_predict = np.zeros((df_train.shape[0] + future_day, df_train.shape[1]))
    output_predict[0] = df_train.iloc[0]
    upper_b = (df_train.shape[0] // timestamp) * timestamp
    init_value = [np.zeros((1, size_layer))] * num_layers

    for k in range(0, (df_train.shape[0] // timestamp) * timestamp, timestamp):
        out_logits, last_state = model.predict(np.expand_dims(df_train.iloc[k:k + timestamp], axis=0), init_value)
        init_value = last_state
        output_predict[k + 1:k + timestamp + 1] = out_logits

    if upper_b != df_train.shape[0]:
        out_logits, last_state = model.predict(np.expand_dims(df_train.iloc[upper_b:], axis=0), init_value)
        output_predict[upper_b + 1:df_train.shape[0] + 1] = out_logits
        future_day -= 1
        date_ori.append(date_ori[-1] + timedelta(days=1))

    init_value = last_state

    for i in range(future_day):
        o = output_predict[-future_day - timestamp + i:-future_day + i]
        out_logits, last_state = model.predict(np.expand_dims(o, axis=0), init_value)
        init_value = last_state
        output_predict[-future_day + i] = out_logits[-1]
        date_ori.append(date_ori[-1] + timedelta(days=1))

    output_predict = minmax.inverse_transform(output_predict)
    deep_future = anchor(output_predict[:, 0], 0.3)

    return deep_future[-test_size:]

# Your model class definition and other helper functions here

In [9]:
results = []
for i in range(simulation_size):
    print('simulation %d'%(i + 1))
    results.append(forecast())

simulation 1


train loop:   0%|          | 0/300 [00:00<?, ?it/s]


ValueError: Layer 'functional_1' expected 3 input(s). Received 2 instead.

In [None]:
accuracies = [calculate_accuracy(df['Close'].iloc[-test_size:].values, r) for r in results]

plt.figure(figsize = (15, 5))
for no, r in enumerate(results):
    plt.plot(r, label = 'forecast %d'%(no + 1))
plt.plot(df['Close'].iloc[-test_size:].values, label = 'true trend', c = 'black')
plt.legend()
plt.title('average accuracy: %.4f'%(np.mean(accuracies)))
plt.show()