In [1]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import metrics
from tensorflow.keras.layers import LSTM, Dense, Dropout, TimeDistributed
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, matthews_corrcoef
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import RandomOverSampler
import numpy as np
import pandas as pd
import os
import random
import tensorflow as tf

In [2]:
# resetting the seeds for reproducibility
def reset_random_seeds():
    n = 1
    os.environ['PYTHONHASHSEED'] = str(n)
    tf.random.set_seed(n)
    np.random.seed(n)
    random.seed(n)

reset_random_seeds()

# import data
df = pd.read_csv('Boruta_onchain_data.csv')
df1 = pd.read_csv('all_data.csv')

df = df[df['timestamp'] >= '2013-03-11'].reset_index(drop=True)

#onchain_data_new
#TA_data
#all_data
#Boruta_data
#Boruta_onchain_data
#Boruta_TA_data

# separate the inputs and target
X = df.drop('timestamp', axis=1)

# create binary classification for price movement
price = pd.DataFrame()
price['today'] = df1['price-ohlc-usd-c']
price['next day'] = price['today'].shift(-1)
y = (price['next day'] > price['today']).astype(int)

# separate training data from testing data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

In [3]:
# scale the input data
scaler = StandardScaler()

# Reshape X_train and X_test if they are 1D
if X_train.ndim == 1:
    X_train = X_train.to_numpy().reshape(-1, 1)
if X_test.ndim == 1:
    X_test = X_test.to_numpy().reshape(-1, 1)

X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# reshape the input data for CNN-LSTM (samples, timesteps, features)
def create_sequences(data, timesteps):
    X = []
    for i in range(len(data) - timesteps + 1):
        X.append(data[i:i + timesteps])
    return np.array(X)

timesteps = 5
X_train_reshaped = create_sequences(X_train_scaled, timesteps)
X_test_reshaped = create_sequences(X_test_scaled, timesteps)
y_train = y_train[timesteps - 1:]
y_test = y_test[timesteps - 1:]

In [4]:
from tensorflow.keras import backend as K

def f1_score_2(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    recall = true_positives / (possible_positives + K.epsilon())
    f1_val = 2*(precision*recall)/(precision+recall+K.epsilon())
    return f1_val

In [5]:
# Define the LSTM model
model = Sequential()
model.add(LSTM(units=256, input_shape=(X_train_reshaped.shape[1], X_train_reshaped.shape[2]), return_sequences=True))
model.add(TimeDistributed(Dense(1, activation='linear')))
model.add(Dropout(0.2))
model.add(LSTM(units=128, return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(units=64))
model.add(Dropout(0.2))
model.add(Dense(64, activation='relu'))
model.add(Dense(1, activation='softmax'))

# compile the model
model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=[metrics.BinaryAccuracy(), 
                                                                                        metrics.Precision(), metrics.Recall(), f1_score_2])

  super().__init__(**kwargs)


In [None]:
# train with the training dataset
early_stopping = EarlyStopping(monitor='f1_score_2', patience=50)
history = model.fit(X_train_reshaped, y_train, epochs=100, batch_size=50, validation_split=0.1, callbacks=[early_stopping])

In [None]:
# Make predictions on the test set
y_pred = model.predict(X_test_reshaped)
y_pred = (y_pred > 0.5)

# evaluate the prediction performance
print("Accuracy:", accuracy_score(y_test, y_pred))
print("Precision:", precision_score(y_test, y_pred))
print("Recall:", recall_score(y_test, y_pred))
print("F1-score:", f1_score(y_test, y_pred))

In [None]:
time = df['timestamp']
time_train, time_test = train_test_split(time, test_size=0.2, shuffle=False)
time_test = time_test[timesteps-1:]

# Flatten y_pred to be a 1-dimensional array
y_pred_flat = y_pred.flatten()

# Create a DataFrame with columns time_test, y_test, and y_pred
pred_res = pd.DataFrame({'date': time_test, 'actual': y_test.values, 'prediction': y_pred_flat, 'value': price['today'][-748:]})

pred_res.to_csv('pred/lstm_uni_data.csv', index=False)