# Technical Indicators

## 0. Imports & Basic formating

### a. Imports

In [1]:
import pandas as pd
import numpy as np
from tabulate import tabulate
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import os
from tensorflow.keras.layers import *
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, SimpleRNN, Flatten, LSTM, Bidirectional
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import matplotlib.pyplot as plt
from tensorflow.keras import models, layers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Normalization
from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.keras.regularizers import l1, l2

from keras.metrics import Precision, Recall
from tensorflow.keras.metrics import Precision
from tensorflow.keras.metrics import F1Score

from sklearn.metrics import roc_curve, auc
from sklearn.metrics import confusion_matrix
from sklearn.utils.multiclass import unique_labels
import seaborn as sns

In [2]:
def import_data(path):
    data = pd.read_csv(path)
    df = data.copy()

    return df

def import_data_sent(path, columns):
    df = import_data(path)
    sentimental_data = df[columns]
    sent_df = sentimental_data.copy()

    return sent_df

### b. Basic Formating

In [3]:
def df_formating(df, columns):
    """
    Preprocess a DataFrame by renaming columns, setting columns to float64,
    dropping unnecessary columns, setting the 'date' column to datetime type,
    and setting the 'date' column as the index.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - columns (list): define which columns of df refere to which variable

    Returns:
    - pd.DataFrame: formated DataFrame.
    """
    # Step 1: Rename columns
    formated_df = df.rename(columns={
        columns[0]: 'date',
        columns[1]: 'open',
        columns[2]: 'high',
        columns[3]: 'low',
        columns[4]: 'adj_close',
        columns[5]: 'volume'
    })

    # Step 2: Set columns to float64
    formated_df = formated_df.astype({'open': 'float32', 'high': 'float32', 'low': 'float32', 'adj_close': 'float32', 'volume': 'float32'})

    # Step 3: Drop all other columns
    columns_to_keep = ['date', 'open', 'high', 'low', 'adj_close', 'volume']
    formated_df = formated_df[columns_to_keep]

    # Step 4: Set 'date' column to datetime type
    formated_df['date'] = pd.to_datetime(formated_df['date'], format='mixed')

    # Step 5: Set 'date' column as the index
    formated_df.set_index('date', inplace=True)

    return formated_df
    

In [4]:
def sent_df_formating(sent_df, columns_sent):
    """
    Preprocess a DataFrame by renaming columns, setting columns to float32,
    dropping unnecessary columns, setting the 'date' column to datetime type,
    and setting the 'date' column as the index.

    Parameters:
    - sent_df (pd.DataFrame): Input DataFrame.
    - columns_sent (list): define which columns of df refere to which price data

    Returns:
    - pd.DataFrame: formated DataFrame.
    """
    # Step 1: Rename columns
    sent_df = sent_df.rename(columns={
        columns_sent[0]: 'date',
        columns_sent[1]: 'score',
        columns_sent[2]: 'total',
        columns_sent[3]: 'positive',
        columns_sent[4]: 'negative'
    })

    # Step 2: Set columns to float64
    sent_df = sent_df.astype({'score': 'float32', 'total': 'float32', 'positive': 'float32', 'negative': 'float32'})

    # Step 3: Set 'date' column to datetime type
    sent_df['date'] = pd.to_datetime(sent_df['date'], format='mixed')

    # Step 4: Set 'date' column as the index
    sent_df.set_index('date', inplace=True)

    # Step 4: Drop Nan rows
    sent_formated_df = sent_df.dropna()
    
    return sent_formated_df

In [5]:
def price_df_formating(df, columns_price):
    """
    Preprocess a DataFrame by renaming columns, setting columns to float32,
    dropping unnecessary columns, setting the 'date' column to datetime type,
    and setting the 'date' column as the index.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - columns_price (list): define which columns of df refere to which variable

    Returns:
    - pd.DataFrame: formated DataFrame.
    """
    # Step 1: Rename columns
    formated_df = df.rename(columns={
        columns_price[0]: 'date',
        columns_price[1]: 'open',
        columns_price[2]: 'adj_close'
    })

    # Step 2: Set columns to float64
    formated_df = formated_df.astype({'open': 'float32', 'adj_close': 'float32'})

    # Step 3: Drop all other columns
    columns_to_keep = ['date', 'open', 'adj_close']
    formated_df = formated_df[columns_to_keep]

    # Step 4: Set 'date' column to datetime type
    formated_df['date'] = pd.to_datetime(formated_df['date'], format='mixed')

    # Step 5: Set 'date' column as the index
    formated_df.set_index('date', inplace=True)

    # Step 6: Drop Nan rows
    price_formated_df = formated_df.dropna()

    return price_formated_df

In [6]:
def labeling_df(formated_df):
    """
    Label a DataFrame by creating a new column 'label', set all values to 0 in that column, 
    set the values to 1 if open price is lower than adjusted close.

    Parameters:
    - formated_df (pd.DataFrame): Input DataFrame.

    Returns:
    - pd.DataFrame: labeled DataFrame.
    """
    # Step 1: Create a new column 'Label' and initialize with 0 (down)
    formated_df['label'] = 0

    # Step 2: Label +1 (up) where 'Open' is lower than 'Adj Close'
    formated_df.loc[formated_df['open'] < formated_df['adj_close'], 'label'] = 1

    # Step 4: Rename df
    labeled_df = formated_df

    return labeled_df

In [7]:
def merge_df(labeled_df, sent_formated_df):
    """
    Merge two DataFrame with the inner merge method to keep a df of values for the matching indexes.

    Parameters:
    - labeled_df (pd.DataFrame): Input DataFrame.
    - sent_formated_df (pd.DataFrame): Input DataFrame.

    Returns:
    - pd.DataFrame: merged DataFrame.
    """
    # Merge two df on their indexes
    merged_df = pd.merge(labeled_df, sent_formated_df, left_index=True, right_index=True)
    
    return merged_df

### c. Pipeline

In [8]:
def price_basic_formating(path, columns_price):
    df = import_data(path)
    
    price_formated_df = price_df_formating(df, columns_price)
    price_labeled_df = labeling_df(price_formated_df)

    return price_labeled_df

In [9]:
def features_basic_formating(path, columns):
    df = import_data(path)
    
    formated_df = df_formating(df, columns)
    labeled_df = labeling_df(formated_df)

    return labeled_df

In [10]:
def sent_and_features_basic_formating(path, columns_sent, columns):
    df = import_data(path)
    sent_df = import_data_sent(path, columns_sent)

    formated_df = df_formating(df, columns)
    labeled_df = labeling_df(formated_df)
    sent_formated_df = sent_df_formating(sent_df, columns_sent)
    merged_df = merge_df(labeled_df, sent_formated_df)

    return merged_df

## 1. Feature Engineering

### A. Moving Average (MA(5) & MA(20))

In [11]:
def moving_averages(df, column_name='adj_close', window_sizes=[5, 20]):
    """
    Add Moving Averages (MA) columns to the DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - column_name (str): Name of the column for which moving averages are calculated.
    - window_sizes (list): List of window sizes for moving averages. Default is [5, 20].

    Returns:
    - pd.DataFrame: DataFrame with added MA columns.
    """
    for window_size in window_sizes:
        ma_column_name = f'MA_{window_size}'
        df[ma_column_name] = df[column_name].rolling(window=window_size).mean()

    return df

### B. Bollinger Band (BB up & BB down)

In [12]:
def bollinger_bands(df, column_name='adj_close', window_size=20, num_std_dev=2):
    """
    Calculate Bollinger Bands for a specified column in a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - column_name (str): Name of the column for which Bollinger Bands are calculated.
    - window_size (int): Window size for the moving average. Default is 20.
    - num_std_dev (int): Number of standard deviations for the upper and lower bands. Default is 2.

    Returns:
    - pd.DataFrame: DataFrame with added columns for Bollinger Bands (BB up, BB down).
    """
    # Calculate the rolling mean (middle band)
    df['middle_band'] = df[column_name].rolling(window=window_size).mean()

    # Calculate the rolling standard deviation
    df['std_dev'] = df[column_name].rolling(window=window_size).std()

    # Calculate Bollinger Bands
    df['bb_up'] = df['middle_band'] + num_std_dev * df['std_dev']
    df['bb_down'] = df['middle_band'] - num_std_dev * df['std_dev']

    # Drop intermediate columns
    df.drop(['middle_band', 'std_dev'], axis=1, inplace=True)

    return df

### C. Relative Difference in the Percentage of the price (RDP(1))

In [13]:
def rdp(df, column_name='adj_close'):
    """
    Calculate Relative Difference in the Percentage of the price (RDP(1)) for a specified column in a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - column_name (str): Name of the column for which RDP(1) is calculated.

    Returns:
    - pd.DataFrame: DataFrame with an added column for RDP(1).
    """
    # Calculate RDP(1)
    df['rdp_1'] = df[column_name].pct_change() * 100

    return df

### D. Bias Ratio (BIAS(6), BIAS(12) & BIAS(24))

In [14]:
def bias(df, column_name='adj_close', ma_windows=[6, 12, 24]):
    """
    Calculate Bias Ratios (BIAS) for specified moving average windows for a column in a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - column_name (str): Name of the column for which BIAS is calculated.
    - ma_windows (list): List of moving average window sizes. Default is [6, 12, 24].

    Returns:
    - pd.DataFrame: DataFrame with added columns for BIAS(6), BIAS(12), and BIAS(24).
    """
    for window_size in ma_windows:
        ma_column_name = f'MA_{window_size}'
        bias_column_name = f'BIAS_{window_size}'

        # Calculate the moving average
        df[ma_column_name] = df[column_name].rolling(window=window_size).mean()

        # Calculate BIAS
        df[bias_column_name] = ((df[column_name] - df[ma_column_name]) / df[ma_column_name]) * 100

        # Drop intermediate columns
        df.drop(ma_column_name, axis=1, inplace=True)

    return df

### E. Relative Strength Index (RSI)

In [15]:
def rsi(df, column_name='adj_close', window=14):
    """
    Calculate the Relative Strength Index (RSI) for a specified column in a DataFrame.

    Parameters:
    - df: (pd.DataFrame): Input DataFrame.
    - column_name (str): Name of the column for which RSI is calculated. Default is 'Close'.
    - window (int): Window size for RSI calculation. Default is 14.

    Returns:
    - pd.DataFrame: DataFrame with an added column for RSI.
    """
    # Calculate daily price changes
    df['price_change'] = df[column_name].diff()

    # Calculate the average gain and average loss over the specified window
    df['gain'] = df['price_change'].apply(lambda x: x if x > 0 else 0).rolling(window=window, min_periods=1).mean()
    df['loss'] = -df['price_change'].apply(lambda x: x if x < 0 else 0).rolling(window=window, min_periods=1).mean()

    # Calculate relative strength (RS)
    df['rs'] = df['gain'] / df['loss']

    # Calculate RSI
    df['rsi'] = 100 - (100 / (1 + df['rs']))

    # Drop intermediate columns
    df.drop(['price_change', 'gain', 'loss', 'rs'], axis=1, inplace=True)

    return df

### F. Exponential Moving Average (EMA(12) & EMA(26))

In [16]:
def ema(df, column_name='adj_close', ema_short=12, ema_long=26):
    """
    Calculate Exponential Moving Averages (EMA) for a specified column in a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - column_name (str): Name of the column for which EMA is calculated. Default is 'Close'.
    - ema_short (int): Short-term EMA window size. Default is 12.
    - ema_long (int): Long-term EMA window size. Default is 26.

    Returns:
    - pd.DataFrame: DataFrame with added columns for EMA(12) and EMA(26).
    """
    # Calculate EMA(12)
    df['ema_12'] = df[column_name].ewm(span=ema_short, adjust=False).mean()

    # Calculate EMA(26)
    df['ema_26'] = df[column_name].ewm(span=ema_long, adjust=False).mean()

    return df

### G. Moving Average Convergence/Divergence (MACD)

In [17]:
def macd(df, column_name='adj_close', ema_short=12, ema_long=26, signal_period=9):
    """
    Calculate Moving Average Convergence Divergence (MACD) and its signal line for a specified column in a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - column_name (str): Name of the column for which MACD is calculated. Default is 'Close'.
    - ema_short (int): Short-term EMA window size. Default is 12.
    - ema_long (int): Long-term EMA window size. Default is 26.
    - signal_period (int): Signal line EMA window size. Default is 9.

    Returns:
    - pd.DataFrame: DataFrame with added columns for MACD, Signal Line, and MACD Histogram.
    """
    # Calculate short-term EMA
    df['ema_short'] = df[column_name].ewm(span=ema_short, adjust=False).mean()

    # Calculate long-term EMA
    df['ema_long'] = df[column_name].ewm(span=ema_long, adjust=False).mean()

    # Calculate MACD Line
    df['dif'] = df['ema_short'] - df['ema_long']

    # Calculate Signal Line
    df['signal_line'] = df['dif'].ewm(span=signal_period, adjust=False).mean()

    # Calculate MACD Histogram
    df['osc'] = df['dif'] - df['signal_line']

    # Drop intermediate columns
    df.drop(['ema_short', 'ema_long'], axis=1, inplace=True)

    return df

### H. Psychological Line (PSY(12) & PSY(24))

In [18]:
def psy(df, column_name='adj_close', psy_short=12, psy_long=24):
    """
    Calculate Psychological Line (PSY) for a specified column in a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - column_name (str): Name of the column for which PSY is calculated. Default is 'Close'.
    - psy_short (int): Short-term PSY window size. Default is 12.
    - psy_long (int): Long-term PSY window size. Default is 24.

    Returns:
    - pd.DataFrame: DataFrame with added columns for PSY(12) and PSY(24).
    """
    # Calculate the percentage of days where the closing price is higher than the previous day's closing price
    df['price_up'] = df[column_name].diff() > 0

    # Calculate PSY(12)
    df['psy_12'] = df['price_up'].rolling(window=psy_short).mean() * 100

    # Calculate PSY(24)
    df['psy_24'] = df['price_up'].rolling(window=psy_long).mean() * 100

    # Drop intermediate columns
    df.drop(['price_up'], axis=1, inplace=True)

    return df

### I. Williams %R (WMS%R)

In [19]:
def williams_percent_r(df, high_column='high', low_column='low', adj_close_column='adj_close', window=14):
    """
    Calculate Williams %R for a specified high, low, and close columns in a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - high_column (str): Name of the column containing high prices. Default is 'High'.
    - low_column (str): Name of the column containing low prices. Default is 'Low'.
    - adj_close_column (str): Name of the column containing close prices. Default is 'Close'.
    - window (int): Window size for Williams %R calculation. Default is 14.

    Returns:
    - pd.DataFrame: DataFrame with an added column for Williams %R.
    """
    # Calculate highest high and lowest low over the specified window
    df['hh'] = df[high_column].rolling(window=window).max()
    df['ll'] = df[low_column].rolling(window=window).min()

    # Calculate Williams %R
    df['williams_%r'] = (df['hh'] - df[adj_close_column]) / (df['hh'] - df['ll']) * -100

    # Drop intermediate columns
    df.drop(['hh', 'll'], axis=1, inplace=True)

    return df

### J. Stochastic Oscillator (Stochastic%K & Stochastic%D)

In [20]:
def stochastic_oscillator(df, high_column='high', low_column='low', adj_close_column='adj_close', k_window=14, d_window=3):
    """
    Calculate Stochastic Oscillator (%K and %D) for specified high, low, and close columns in a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - high_column (str): Name of the column containing high prices. Default is 'High'.
    - low_column (str): Name of the column containing low prices. Default is 'Low'.
    - close_column (str): Name of the column containing close prices. Default is 'Close'.
    - k_window (int): Window size for %K calculation. Default is 14.
    - d_window (int): Window size for %D calculation. Default is 3.

    Returns:
    - pd.DataFrame: DataFrame with added columns for Stochastic %K and %D.
    """
    # Calculate lowest low and highest high over the specified window
    df['ll'] = df[low_column].rolling(window=k_window).min()
    df['hh'] = df[high_column].rolling(window=k_window).max()

    # Calculate Stochastic %K
    df['stochastic_%k'] = ((df[adj_close_column] - df['ll']) / (df['hh'] - df['ll'])) * 100

    # Calculate Stochastic %D (3-day simple moving average of %K)
    df['stochastic_%d'] = df['stochastic_%k'].rolling(window=d_window).mean()

    # Drop intermediate columns
    df.drop(['ll', 'hh'], axis=1, inplace=True)

    return df

### K. Percentage of Price Change (PROC)

In [21]:
def proc(df, column_name='adj_close', window=1):
    """
    Calculate Percentage of Price Change (PROC) for a specified column in a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - column_name (str): Name of the column for which PROC is calculated. Default is 'Close'.
    - window (int): Window size for PROC calculation. Default is 1.

    Returns:
    - pd.DataFrame: DataFrame with an added column for PROC.
    """
    # Calculate the percentage change in price using rolling window
    df['proc'] = df[column_name].pct_change().rolling(window=window).mean() * 100

    return df

### L. Momentum (MO(1))

In [22]:
def momentum(df, column_name='adj_close', window=1):
    """
    Calculate Momentum (MO) for a specified column in a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - column_name (str): Name of the column for which Momentum is calculated. Default is 'Close'.
    - window (int): Window size for Momentum calculation. Default is 1.

    Returns:
    - pd.DataFrame: DataFrame with an added column for Momentum.
    """
    # Calculate the difference in price over the specified window
    df['momentum'] = df[column_name].diff(window)

    return df

### M. First-Order Lag (LAG(1))

In [23]:
def first_order_lag(df, column_name='adj_close', lag=1):
    """
    Calculate First-Order Lag (LAG(1)) for a specified column in a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - column_name (str): Name of the column for which the lag is calculated. Default is 'Close'.
    - lag (int): Number of periods to lag. Default is 1.

    Returns:
    - pd.DataFrame: DataFrame with an added column for the First-Order Lag.
    """
    # Calculate the First-Order Lag using the shift() method
    df[f'LAG_{lag}'] = df[column_name].shift(lag)

    return df

### N. Trading Volume (VOL)

In [24]:
def trading_volume(df, volume_column='volume'):
    """
    Calculate Trading Volume (VOL) for a specified column in a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - volume_column (str): Name of the column containing trading volume. Default is 'Volume'.

    Returns:
    - pd.DataFrame: DataFrame with an added column for Trading Volume.
    """
    df['vol'] = df[volume_column]

    return df

### O. Pipeline

In [25]:
def feature_engineering(df):

    moving_averages(df)
    bollinger_bands(df)
    rdp(df)
    bias(df)
    rsi(df)
    ema(df)
    macd(df)
    psy(df)
    williams_percent_r(df)
    stochastic_oscillator(df)
    proc(df)
    momentum(df)
    first_order_lag(df)
    trading_volume(df)
    
    return df

## 2. Preprocessing

### a. Removing columns and rows

In [26]:
def drop_columns(df, columns_to_drop=['open', 'high', 'low', 'adj_close', 'volume']):
    """
    Drop specified columns from a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.
    - columns_to_drop (list): List of column names to drop. Default is ['Open', 'High', 'Low', 'Adj_Close', 'Volume'].

    Returns:
    - pd.DataFrame: DataFrame with specified columns dropped.
    """
    # Drop specified columns
    df = df.drop(columns=columns_to_drop, errors='ignore')

    return df

In [27]:
def drop_rows(df):
    """
    Drop all rows with NaN values from a DataFrame.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.

    Returns:
    - pd.DataFrame: DataFrame with NaN rows dropped.
    """
    # Drop rows with NaN values
    cleaned_df = df.dropna()

    return cleaned_df

### b. Scaling

In [28]:
def scale_dataframe(df):
    """
    Scale a DataFrame using Standard scaling.

    Parameters:
    - df (pd.DataFrame): Input DataFrame.

    Returns:
    - pd.DataFrame: Scaled DataFrame.
    """
    # Scale the selected columns
    scaler = StandardScaler()

    index_column = df.index
    
    label_column = df['label']
    int_df = df.drop(columns=['label'])
    
    columns_to_scale = int_df.columns
    
    scaled_df = pd.DataFrame(scaler.fit_transform(int_df), columns=columns_to_scale)
    scaled_df.index = index_column
    scaled_df['label'] = label_column
    
    return scaled_df

### d. Train Test Split

In [29]:
def train_test_split(df, test_size=0.2):
    """
    Split a time series dataset into training and testing sets.

    Parameters:
    - df: the input time series dataset.
    - test_size (float): the proportion of the dataset to include in the test split.

    Returns:
    - df_train, df_test: Pandas arrays, representing features and target values for each set.
    """
    
    # Extract index number of splitting points
    len_df = len(df)
    index_1 = round(len_df*(1-(test_size)))
    index_2 = index_1 +1

    # Extract values at previously calculated splitting points
    date_1 = df.index[index_1]
    date_2 = df.index[index_2]

    # Construct train_df, val_df and test_df
    df_train = df[:date_1]
    df_test = df[date_2:]
    
    return df_train, df_test

### e. Reshape Matrix

In [30]:
def input_matrix_split_X_y(df, window_size=5):
    """
    Reshape a DataFrames into two 3D NumPy arrays 

    Parameters:
    - df: DataFrame with a list of time series data
    - window_size: the number of time steps to consider for each observation

    Returns:
    - X: (num_observations, window_size, num_features)
    - y: (num_observations, num_features_to_predict)
    """
    df_np = df.to_numpy()
    X = []
    y = []
    
    df_X = df.drop('label', axis=1)
    df_y = df['label']

    for i in range(len(df_np)-(window_size)):
        row = df_X[i:i+window_size]
        X.append(row)
        label = df_y[i+(window_size)]
        y.append(label)

    X = np.array(X)
    y = np.array(y)
    y = np.expand_dims(y, axis=-1)
    
    return X, y

### f. Pipeline

In [31]:
columns_to_drop = ['open']

In [32]:
def preprocessing(df):
    
    clean_merged_df = drop_columns(df)
    clean_merged_df = drop_rows(clean_merged_df)
    scaled_clean_merged_df = scale_dataframe(clean_merged_df)
    split_scaled_clean_merged_df = train_test_split(scaled_clean_merged_df)
    X_train, y_train = input_matrix_split_X_y(split_scaled_clean_merged_df[0])
    X_test, y_test = input_matrix_split_X_y(split_scaled_clean_merged_df[1])

    return X_train, y_train, X_test, y_test

In [33]:
def preprocessing_price(df, columns_to_drop):
    
    clean_merged_df = drop_columns(df, columns_to_drop)
    clean_merged_df = drop_rows(clean_merged_df)
    scaled_clean_merged_df = scale_dataframe(clean_merged_df)
    split_scaled_clean_merged_df = train_test_split(scaled_clean_merged_df)
    X_train, y_train = input_matrix_split_X_y(split_scaled_clean_merged_df[0])
    X_test, y_test = input_matrix_split_X_y(split_scaled_clean_merged_df[1])

    return X_train, y_train, X_test, y_test

## 3. Models Architecture

### Prework

#### Val & Pred

In [34]:
def model_validation(model, X_test, y_test, verbose=0):

    model_acc = model.evaluate(X_test, y_test, verbose=verbose)

    return model_acc

In [35]:
def model_prediction(model, X_test, threshold = 0.5):

    y_pred = model.predict(X_test)
    binary_predictions = (y_pred >= threshold).astype(int)
    binary_predictions = np.squeeze(binary_predictions)

    return binary_predictions

#### Visuals

In [None]:
def plot_loss_accuracy(history, title=None):
    fig, ax = plt.subplots(1,2, figsize=(20,7))

    # --- LOSS --- 

    ax[0].plot(history.history['loss'])
    ax[0].plot(history.history['val_loss'])

    ax[0].set_title('Model loss')
    ax[0].set_ylabel('Loss')
    ax[0].set_xlabel('Epoch')

    ax[0].legend(['Train', 'Val'], loc='best')

    ax[0].grid(axis="x",linewidth=0.5)
    ax[0].grid(axis="y",linewidth=0.5)

    # --- ACCURACY

    ax[1].plot(history.history['accuracy'])
    ax[1].plot(history.history['val_accuracy'])

    ax[1].set_title('Model Accuracy')
    ax[1].set_ylabel('Accuracy')
    ax[1].set_xlabel('Epoch')

    ax[1].legend(['Train', 'Val'], loc='best')

    ax[1].grid(axis="x",linewidth=0.5)
    ax[1].grid(axis="y",linewidth=0.5)

    if title:
        fig.suptitle(title)

### a. LSTM Model (all price features, 5D, 60min, (0,1))

In [37]:
path = "../../raw_data/pro_btc_60min_price_df_v2.csv"
columns = ['date', 'open', 'high', 'low', 'adj_close', 'volume']

labeled_df = features_basic_formating(path, columns)
feature_labeled_df = feature_engineering(labeled_df)
f_X_train, f_y_train, f_X_test, f_y_test = preprocessing(feature_labeled_df)

  label = df_y[i+(window_size)]
  label = df_y[i+(window_size)]


In [40]:
def lstm_model_initialization(X_train, window_size=5, loss_function='binary_crossentropy', metrics_list=['accuracy']):
    
    #############################
    #  1 - Model architecture   #
    ############################# 
    normalizer = Normalization()
    normalizer.adapt(X_train)
    
    model = Sequential()
    model.add(normalizer)
    model.add(layers.LSTM(units=10, activation='tanh', return_sequences=True, input_shape=(window_size, X_train.shape[-1]), kernel_regularizer=l2(0.5)))
    model.add(layers.LSTM(units=3, activation='tanh', return_sequences=False))
    model.add(layers.Dense(10, activation='relu'))
    model.add(layers.Dense(3, activation='relu'))
    model.add(layers.Dropout(0.3))
    model.add(layers.Dense(1, activation='sigmoid'))
    
    #############################
    #  2 - Optimization Method  #
    #############################
    model.compile(loss= loss_function,
                  optimizer = Adam(learning_rate=0.0001), 
                  metrics = metrics_list) 

    return model

In [41]:
def lstm_model_training(model, X_train, y_train, patience=50, validation_split=0.2, batch_size=64, epochs=100, verbose=0):

    es = EarlyStopping(patience=patience, restore_best_weights=True)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=5, min_lr=0.00001)
    
    history = model.fit(X_train, y_train,
                        validation_split=validation_split,
                        batch_size=batch_size,
                        epochs=epochs, 
                        callbacks=[es, reduce_lr],
                        verbose=verbose)

    return history

In [42]:
f_lstm_model = lstm_model_initialization(f_X_train)
f_lstm_history = lstm_model_training(f_lstm_model, f_X_train, f_y_train)
f_lstm_model_acc = model_validation(f_lstm_model, f_X_test, f_y_test)
f_lstm_y_pred = model_prediction(f_lstm_model, f_X_test)





In [43]:
f_lstm_model_acc

[0.6935366988182068, 0.497609943151474]

In [44]:
f_lstm_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 normalization (Normalizati  (None, None, 23)          47        
 on)                                                             
                                                                 
 lstm (LSTM)                 (None, None, 10)          1360      
                                                                 
 lstm_1 (LSTM)               (None, 3)                 168       
                                                                 
 dense (Dense)               (None, 10)                40        
                                                                 
 dense_1 (Dense)             (None, 3)                 33        
                                                                 
 dropout (Dropout)           (None, 3)                 0         
                                                        

In [45]:
plot_loss_accuracy(f_lstm_history)
print(f'The accuracy on the test set is of {f_lstm_model_acc[1]:.2f}')

NameError: name 'plot_loss_accuracy' is not defined

## 4. Evaluate Model

In [None]:
def plot_roc_curve1(y_true_cls, y_pred_prob):
    
    fpr, tpr, _ = roc_curve(y_true_cls, y_pred_prob)
    roc_auc = auc(fpr, tpr)
    
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend()
    plt.show()

In [None]:
def plot_confusion_matrix(y_true, y_pred, title=None):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """

    squeezed_array = np.squeeze(y_true)
    
    if not title:
        title = 'Confusion Matrix'

    # Compute confusion matrix
    cm = confusion_matrix(squeezed_array, y_pred)
    
    # Get class labels
    classes = unique_labels(squeezed_array, y_pred)

    # Create a heatmap using seaborn
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt=".0f", cmap="Blues",
                xticklabels=classes, yticklabels=classes)
    
    plt.title(title)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()

In [None]:
plot_confusion_matrix(f_y_test, f_lstm_y_pred)

In [None]:
plot_roc_curve1(f_y_test, f_lstm_y_pred)