In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error

## Import the Data 

In [None]:
df = pd.read_csv('BTC_Trial_NB.csv')
df.head()

In [None]:
df['Date'] = pd.to_datetime(df['Date'], dayfirst=True)
df = df.drop(['Date', 'Month', 'Year', 'Network Difficulty'],axis=1)
df.head()

In [None]:
pd.set_option('display.max_columns', None)
df.info()

In [None]:
df.isna().sum()

## 1. Data Preprocessing

### 1.2 Scale the data

In [6]:
from sklearn import preprocessing

df_for_training=df[:1848]
df_for_testing=df[1848:]

scaler = preprocessing.MinMaxScaler()
df_for_training_scaled = scaler.fit_transform(df_for_training)

df_for_testing_scaled=scaler.transform(df_for_testing)

In [None]:
df_for_training_scaled.shape, df_for_testing_scaled.shape

### 1.3 Create the reshaped input vector for the LSTM model 

In [8]:
def createXY(dataset,n_past):
    dataX = []
    dataY = []
    for i in range(n_past, len(dataset)):
            dataX.append(dataset[i - n_past:i, 0:dataset.shape[1]])
            dataY.append(dataset[i,3])
    return np.array(dataX),np.array(dataY)

The input of the model is the scaled data of the closing price, with the time atep (window) of 30 observations

In [9]:
trainX,trainY=createXY(df_for_training_scaled,30)
testX,testY=createXY(df_for_testing_scaled,30)

In [None]:
trainX.shape, trainY.shape, testX.shape, testY.shape

## 2. Deep learning modelling: LSTM model with 1 LSTM layer and 2 Dense layers (Vanilla)

### 2.1 Fitting the model

In [11]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers  import *
from tensorflow.keras.losses import MeanSquaredError 
from tensorflow.keras.metrics import RootMeanSquaredError
from tensorflow.keras.optimizers import Adam

In [None]:
model = Sequential()
model.add(InputLayer((trainX.shape[1], trainX.shape[2])))
model.add(LSTM(64))
model.add(Dense(8, 'relu'))
model.add(Dense(1, 'linear'))

model.summary()

In [13]:
model.compile(loss = MeanSquaredError(), optimizer= Adam(learning_rate= 0.001), metrics= [RootMeanSquaredError()])

In [None]:
model.fit(trainX, trainY, validation_data=(testX, testY), epochs=20, batch_size=128) 

### 2.2 Model's prediction and evaluation

Model's prediction on the training set:

In [None]:
train_pred = model.predict(trainX).flatten()
train_pred

In [None]:
train_result = pd.DataFrame(data= {'Train Pred':train_pred, 'Actual':trainY.flatten()})
train_result

In [None]:
plt.plot(train_result['Actual'])
plt.plot(train_result['Train Pred'])


Model's prediction on the test set:

In [None]:
test_pred = model.predict(testX).flatten()
test_result = pd.DataFrame(data= {'Test Pred':test_pred, 'Actual':testY.flatten()})
test_result

In [None]:
plt.plot(test_result['Actual'])
plt.plot(test_result['Test Pred'])

The RMSE of the predicted value on the Test data: 

In [None]:
RMSE_1 = np.sqrt(mean_squared_error(testY, test_pred))
RMSE_1

## 3. Hyperparameter tuning: Tuning model with 2 lstm layers and 2 Dense layers 

### 3.1 Initialize the model tunning function

In [21]:
import keras_tuner as kt

In [22]:
def build_model(hp):
    model = Sequential()
    model.add(InputLayer((trainX.shape[1], trainX.shape[2])))
    
    lstm_units = hp.Choice('lstm_units', values=[32, 64, 128, 256])
    model.add(LSTM(lstm_units, return_sequences=True))
    
    dropout_rate = hp.Choice('dropout_rate', values=[0.05, 0.1, 0.15, 0.2])
    model.add(Dropout(dropout_rate))
    
    model.add(LSTM(lstm_units))
    model.add(Dropout(dropout_rate))
    
    activation_function = hp.Choice('activation_function', values=['relu', 'tanh'])
    dense_units = hp.Choice('dense_units', values=[8, 16, 32, 64])  
    model.add(Dense(dense_units, activation=activation_function))
    model.add(Dense(1, activation='linear'))
    
    learning_rate = hp.Choice('learning_rate', values=[0.001, 0.01, 0.1])
    model.compile(
        loss=MeanSquaredError(),
        optimizer=Adam(learning_rate=learning_rate),
        metrics=[RootMeanSquaredError()]
    )
    return model

In [33]:
tuner = kt.GridSearch(
    build_model,
    objective='val_loss',
    executions_per_trial=1,
    directory='my_model10',
    project_name='lstm_tuning'
)

### 3.2 Start the tuning process

We set the number of epoch equals 20 with the batch size of 32. The model uses the Test set as the validation data. 

In [None]:
tuner.search(
    trainX, trainY,
    epochs= 100,
    batch_size= 64,
    validation_data=(testX, testY)
)

Obtaining the best performing model and set it to "best_model"

In [None]:
best_model = tuner.get_best_models(num_models=1)[0]
best_model

This is the result of the 10 best performing model:

In [None]:
tuner.results_summary()

We extract the hyperparameters from the best performing model and set it to "best_hp"

In [None]:
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"""
The best hyperparameters are:
- lstm_units: {best_hps.get('lstm_units')}
- dropout_rate: {best_hps.get('dropout_rate')}
- activation function: {best_hps.get('activation_function')}
- dense_units: {best_hps.get('dense_units')}
- learning_rate: {best_hps.get('learning_rate')}
""")

Predicting the value of the Test data using best_model:

In [None]:
y_pred = best_model.predict(testX)
# best_test_result = pd.DataFrame(data= {'Test Pred':y_test_best_pred, 'Actual':testY.flatten()})

In [29]:
prediction_copies_array = np.repeat(y_pred,23, axis=-1)
original_copies_array = np.repeat(testY,23, axis=-1)

pred = scaler.inverse_transform(np.reshape(prediction_copies_array,(len(y_pred),23)))[:,3]
original=scaler.inverse_transform(np.reshape(original_copies_array,(len(testY),23)))[:,3]

In [None]:
# Flattening the predictions and actual values
y_test_flat = np.ravel(original)
y_pred_flat = np.ravel(pred)

# Plotting
indices = np.arange(len(y_test_flat))

plt.figure(figsize=(12, 4))
plt.plot(indices, y_test_flat, label='Actual Price', color='blue')
plt.plot(indices, y_pred_flat, label='Predicted Price', color='red', linestyle='--')

plt.xlabel('Index')
plt.ylabel('Price')
plt.title('Actual vs. Predicted Price')
plt.legend()
plt.show()

In [None]:
# RMSE
from sklearn.metrics import mean_squared_error
import numpy as np
def rmse(y_true, y_pred):
    return np.sqrt(mean_squared_error(y_true, y_pred))

# MAPE
def mape(y_true, y_pred):
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

#NMSE
def nmse(y_true, y_pred):
    mse = mean_squared_error(y_true, y_pred)
    variance = np.var(y_true)
    return mse / variance

#DA
def DA(y_true, y_pred):
    # Convert the arrays to numpy arrays
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    # Calculate the direction of change
    true_direction = np.sign(np.diff(y_true))
    pred_direction = np.sign(np.diff(y_pred))
    
    # Compare directions
    correct_direction = np.sum(true_direction == pred_direction)
    total_direction = len(true_direction)
    
    # Calculate directional accuracy
    da = correct_direction / total_direction * 100
    
    return da


In [None]:
y_true = y_test_flat
y_predi = y_pred_flat

print("RMSE: ", rmse(y_true, y_predi))
print("MAPE: ", mape(y_true, y_predi))
print("NMSE: ", nmse(y_true, y_predi))
print("DA: ", DA(y_true, y_predi))
