# LSTM Trade Signal

This notebook seeks to use LSTM neural networks to give a trading signal based on a variant of the triple barrier labelling method. We will consider our trade horizon to be 5 timesteps (hours) into the future, with take profit and stop loss values being set as a factor of the Average True Range away from the current price.

In [1]:
# Installing technical indicator library for the Average True Range
!pip install pandas_ta

import pandas as pd
import os
from collections import deque
import random
import numpy as np
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint, EarlyStopping
from tensorflow.keras import regularizers
import pandas_ta as ta







In [None]:
SEQ_LEN = 20

PREDICT_HORIZON1 = 1
PREDICT_HORIZON2 = 2
PREDICT_HORIZON3 = 3
PREDICT_HORIZON4 = 4
PREDICT_HORIZON5 = 5


# ATR factors
TP_ATR = 1
SL_ATR = 1

In [None]:
# Classifying for 5 candles in the future
def classify(current_close, current_atr, future_high_1, future_low_1, future_high_2, future_low_2, 
                    future_high_3, future_low_3, future_high_4, future_low_4, future_high_5, future_low_5):
    
  
    min_price = min(future_low_1, future_low_2, future_low_3, future_low_4, future_low_5)
    max_price = max(future_high_1, future_high_2, future_high_3, future_high_4, future_high_5)


    TP_BUY = current_close + TP_ATR*current_atr
    SL_BUY = current_close - SL_ATR*current_atr
    TP_SELL = current_close - TP_ATR*current_atr
    SL_SELL = current_close + SL_ATR*current_atr

    # Uptrend
    if float(max_price) >= TP_BUY and float(min_price) > SL_BUY and float(min_diff) > 0:
        return 2

    # Downtrend
    elif float(min_price) <= TP_SELL and float(max_price) < SL_SELL and float(max_diff) < 0:
        return 0 

    # Flat trend
    else:
        return 1



def preprocess(df):

    for col in df.columns: 
        col_end = col.rsplit('_', -1)[-1] # get the last word of the string
        if col != 'target':
            if col_end == 'volume':
                df[col] = (df[col].values - df[col].mean())/df[col].std() # standardizing volume (mean 0 , std 1)

            elif col_end == 'value':
                df[col] = (df[col].values - df[col].mean())/df[col].std() # standardizing indicator values (mean 0 , std 1)

            elif col_end == 'open' or col_end == 'high' or col_end == 'low' or col_end == 'close': 
                df[col] = (df[col].values - df[col].mean())/df[col].std() # standardizing price returns

    df.dropna(inplace=True)  


    sequential_data = []  
    prev_days = deque(maxlen=SEQ_LEN)  

    for i in df.values:  # iterate over the values
        prev_days.append([n for n in i[:-1]])  # store each row excluding target
        if len(prev_days) == SEQ_LEN:  # 20 sequences 
            sequential_data.append([np.array(prev_days), i[-1]])  

    random.shuffle(sequential_data)  

    buys = []  
    sells = []  
    flats = []

    # ---------------------------------------------------------------------------
    for seq, target in sequential_data:  
        if target == 0:  # sell
            sells.append([seq, target])  
        if target == 1:  # flat
            flats.append([seq, target])  
        elif target == 2:  # buy
            buys.append([seq, target])  

    
    random.shuffle(buys)  
    random.shuffle(sells)  
    random.shuffle(flats)  
    
    # Balancing our dataset
    lower = min(len(buys), len(sells), len(flats))  

    buys = buys[:lower]  
    flats = flats[:lower]
    sells = sells[:lower] 

    sequential_data = buys+flats+sells  
    random.shuffle(sequential_data)  

    X = []
    y = []

    for seq, target in sequential_data: 
        X.append(seq)  
        y.append(target)  

    return np.array(X), y  



def preprocess_stats_df(main_df):
    
    df = main_df.copy()
    stats_dict = {}
                      
    for col in df.columns:
        col_end = col.rsplit('_', -1)[-1] # get the last word of the string
        if col_end == 'volume':
            mean = df[col].mean()
            std = df[col].std()
            stats_dict[f'{col}'] = {'mean': mean, 'std': std}

        elif col_end == 'value':
            mean = df[col].mean()
            std = df[col].std()
            stats_dict[f'{col}'] = {'mean': mean, 'std': std}

        elif col_end == 'open' or col_end == 'high' or col_end == 'low' or col_end == 'close':
            mean = df[col].mean()
            std = df[col].std()
            stats_dict[f'{col}'] = {'mean': mean, 'std': std}

    return stats_dict

In [None]:
# Crypto hourly data
path = ''
main_df = pd.read_csv(path)
main_df.set_index('time', inplace=True)

main_df.sort_index() 

In [None]:
# Time transformations

day = 60*60*24 # seconds in the day
week = 60*60*24*7 # seconds in week

main_df['Day_sin'] = np.sin(main_df.index * (2 * np.pi / day))
main_df['Day_cos'] = np.cos(main_df.index * (2 * np.pi / day))
main_df['Week_sin'] = np.sin(main_df.index * (2 * np.pi / week))
main_df['Week_cos'] = np.cos(main_df.index * (2 * np.pi / week))

In [None]:
# Features

main_df['atr_value'] = ta.atr(high=main_df['high'], low=main_df['low'], 
                              close=main_df['close'])


for i in [1, 2, 3, 5, 10, 20]:    
    main_df[f'ret_{i}_close'] = main_df['close'].pct_change(i)
    main_df[f'ret_{i}_high'] = main_df['high'].pct_change(i)
    main_df[f'ret_{i}_low'] = main_df['low'].pct_change(i)
    main_df[f'ret_{i}_open'] = main_df['open'].pct_change(i)
    main_df[f'ret_{i}_volume'] = main_df['volume'].pct_change(i)
    

main_df.replace([np.inf, -np.inf], np.nan, inplace=True)
main_df.dropna(inplace=True)

In [None]:
# Stats dictionary for deployment (best practice to keep commented when not needing it)
# stats = preprocess_stats_df(main_df)
# stats

In [None]:
# Creating columns for future high and low prices by shifting values from future
main_df['future_high_1'] = main_df['high'].shift(-PREDICT_HORIZON1)
main_df['future_low_1'] = main_df['low'].shift(-PREDICT_HORIZON1)

main_df['future_high_2'] = main_df['high'].shift(-PREDICT_HORIZON2)
main_df['future_low_2'] = main_df['low'].shift(-PREDICT_HORIZON2)

main_df['future_high_3'] = main_df['high'].shift(-PREDICT_HORIZON3)
main_df['future_low_3'] = main_df['low'].shift(-PREDICT_HORIZON3)

main_df['future_high_4'] = main_df['high'].shift(-PREDICT_HORIZON4)
main_df['future_low_4'] = main_df['low'].shift(-PREDICT_HORIZON4)

main_df['future_high_5'] = main_df['high'].shift(-PREDICT_HORIZON5)
main_df['future_low_5'] = main_df['low'].shift(-PREDICT_HORIZON5)

main_df.dropna(inplace=True)
main_df

In [None]:
# Create target column (y value)
main_df['target'] = list(map(classify, main_df['close'], main_df['atr_value'], 
                              main_df['future_high_1'], main_df['future_low_1'], main_df['future_high_2'], main_df['future_low_2'], 
                              main_df['future_high_3'], main_df['future_low_3'], main_df['future_high_4'], main_df['future_low_4'], 
                              main_df['future_high_5'], main_df['future_low_5']))

# Dropping future columns to prevent any look ahead bias
for i in [1, 2, 3, 4, 5]:
    main_df = main_df.drop(f'future_high_{i}', axis=1)
    main_df = main_df.drop(f'future_low_{i}', axis=1)

main_df

In [None]:
# Getting list of sorted times to split into train and validation sets
times = sorted(main_df.index.values)

main_df.replace([np.inf, -np.inf], np.nan, inplace=True)
main_df.dropna(inplace=True)
main_df

In [None]:
# Calculating the last 20% threshold
last_20pct = times[-int(0.20*len(times))]

# Splitting into train and validation
validation_main_df = main_df[(main_df.index >= last_20pct)]
main_df = main_df[(main_df.index < last_20pct)]

# Preprocessing, X and y
train_x, train_y = preprocess(main_df)
validation_x, validation_y = preprocess(validation_main_df)

# Checking the outcome of the above
print(f'train data: {len(train_x)} validation: {len(validation_x)}')
print(f'sells: {train_y.count(0)}, flats: {train_y.count(1)}, buys: {train_y.count(2)}')
print(f'VALIDATION sells: {validation_y.count(0)}, flats: {validation_y.count(1)}, buys: {validation_y.count(2)}')

In [None]:
# Creating arrays for our neural network
train_x = np.asarray(train_x)
train_y = np.asarray(train_y)
validation_x = np.asarray(validation_x)
validation_y = np.asarray(validation_y)

In [None]:
# Creating our LSTM
model = Sequential()

model.add(Bidirectional(LSTM(16, 
          kernel_regularizer=regularizers.L1L2(l1=1e-5, l2=1e-5),
          bias_regularizer=regularizers.L2(1e-5),
          activity_regularizer=regularizers.L2(1e-5),
          activation='tanh', input_shape=(train_x.shape[1:]), return_sequences=True)))
model.add(BatchNormalization())
model.add(Dropout(0.1))

model.add(Bidirectional(LSTM(16, 
          kernel_regularizer=regularizers.L1L2(l1=1e-5, l2=1e-5),
          bias_regularizer=regularizers.L2(1e-5),
          activity_regularizer=regularizers.L2(1e-5),
          activation='tanh', input_shape=(train_x.shape[1:]))))
model.add(BatchNormalization())
model.add(Dropout(0.1))

model.add(Dense(8, activation='relu'))
model.add(Dropout(0.1))

model.add(Dense(6, activation='relu'))
model.add(Dropout(0.1))

model.add(Dense(3, activation='softmax'))


opt = tf.keras.optimizers.Adam(learning_rate=0.001, decay=1e-6, amsgrad=True)

# Compile model
model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=opt,
    metrics=['accuracy']
    )


filepath = 'NETWORK_NAME_{epoch:02d}'
checkpoint = ModelCheckpoint('save_path/{}.model'.format(save_path, filepath, monitor='val_loss', 
                                                  verbose=1, save_best_only=True, mode='min')) 

In [None]:
history = model.fit(
    train_x, train_y,
    batch_size=64,
    epochs=100,
    validation_data=(validation_x, validation_y),
    callbacks=[checkpoint]
    )