In [None]:
import random 
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPool2D, Input
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, f1_score, mean_absolute_error
import h5py

Declare constants

In [None]:
TRAIN_TEST_CUTOFF = '2020-01-31'
TRAIN_VALID_RATIO = 0.75

Define metric functions

In [None]:
def _recall(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall
 
def _precision(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision
 
def _f1(y_true, y_pred):
    precision = _precision(y_true, y_pred)
    recall = _recall(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))
 
def f1macro(y_true, y_pred):
    f_pos = _f1(y_true, y_pred)
    # negative version of the data and prediction
    f_neg = _f1(1-y_true, 1-K.clip(y_pred,0,1))
    return (f_pos + f_neg)/2

In [None]:
def cnnpred_2d(seq_len=60, n_features=74, n_filters=(8,8,8), droprate=0.1):
    "2D-CNNpred model according to the paper"
    
    model = Sequential([
        Input(shape=(seq_len, n_features, 1)),
        Conv2D(n_filters[0], kernel_size=(1, n_features), activation="relu"),
        Conv2D(n_filters[1], kernel_size=(3,1), activation="relu"),
        MaxPool2D(pool_size=(2,1)),
        Conv2D(n_filters[2], kernel_size=(3,1), activation="relu"),
        MaxPool2D(pool_size=(2,1)),
        Flatten(),
        Dropout(droprate),
        Dense(1, activation="sigmoid")
    ])

    return model

cnnpred_2d().summary()

In [None]:
def datagen(df, seq_len, batch_size, target_col, kind):
    """A generator to produce samples for Keras model"""

    batch = []

    while True:

        # Set up splitting parameters
        input_cols = [c for c in df.columns if c != target_col]
        index = df.index[df.index < TRAIN_TEST_CUTOFF]
        split = int(len(index) * TRAIN_VALID_RATIO)

        # Range for the training set
        if kind == 'train':
            index = index[:split]

        # Range for the validation set
        elif kind == 'valid':
            index = index[split:]   

        while True:
            "Pick one position, then clip a sequence length"

            # Pick one time step
            t = random.choice(index)

            # Find its position in the DataFrame      
            n = (df.index == t).argmax()

            # Start over if there isn't enough data for one sequence length  
            if (n - seq_len + 1) < 0:
                continue
            
            # Create the DataFrame of one sequence length
            frame = df.iloc[n - seq_len+1 : n+1]

            # Append X and y values as a sample in the CNN dataset
            batch.append([frame[input_cols].values, df.loc[t, target_col]])

            break

        # If we get enough for a batch, yield the instance
        if len(batch) == batch_size:

            # Unpack the `batch` list into features and target
            X, y = zip(*batch)

            # Expand dimensions of X
            X, y = np.expand_dims(np.array(X), 3), np.array(y)

            # Yield the sample
            yield X, y

            # Clear the batch list for next iteration
            batch = []

In [None]:
def testgen(df, seq_len, target_col):
    "Return array of all test samples"

    batch = []

    input_cols = [c for c in df.columns if c != target_col]

    # find the start of test sample
    t = df.index[df.index > TRAIN_TEST_CUTOFF][0]
    n = (df.index == t).argmax()

    for i in range(n+1, len(df)+1):

        frame = df.iloc[i-seq_len:i]
        batch.append([frame[input_cols].values, frame[target_col][-1]])

    X, y = zip(*batch)

    return np.expand_dims(np.array(X),3), np.array(y)

In [None]:
data = pd.read_csv('../csv/initial_variables.csv', index_col='date', parse_dates=True, infer_datetime_format=True)

In [None]:
cols = data.columns

# If the current price is higher than yesterday's price then target = 1, else 0
data['target'] = (data['close'].pct_change().shift(-1) > 0).astype(int)

data.dropna(inplace=True)

# Fit the standard scaler using the training dataset
index = data.index[data.index > TRAIN_TEST_CUTOFF]
index = index[:int(len(index) * TRAIN_VALID_RATIO)]
scaler = MinMaxScaler().fit(data.loc[index, cols])

# Save scale transformed dataframe
data[cols] = scaler.transform(data[cols])
data

In [None]:
seq_len = 60
batch_size = 64
n_epochs = 20
n_features = 74
 
model = cnnpred_2d(seq_len, n_features)
model.compile(optimizer='adam', loss='mae', metrics=['acc', f1macro])
model.summary()

In [None]:
checkpoint_path = './models/cp2d-{epoch}-{val_f1macro:.2f}.h5'
callbacks = [
    ModelCheckpoint(checkpoint_path,
                    monitor='val_f1macro', mode='max', verbose=0,
                    save_best_only=True, save_weights_only=False, save_freq='epoch')
]

In [None]:
training_gen = datagen(data, seq_len, batch_size, 'target', 'train')
validation_gen = datagen(data, seq_len, batch_size, 'target', 'valid')

model.fit(
    training_gen,
    validation_data=validation_gen,
    epochs=n_epochs, 
    steps_per_epoch=400, 
    validation_steps=10, 
    verbose=1,
    callbacks=callbacks
)

In [None]:
# Prepare test data
test_data, test_target = testgen(data, seq_len, "target")
 
# Test the model
test_out = model.predict(test_data)
test_pred = (test_out > 0.5).astype(int)
print("accuracy:", accuracy_score(test_pred, test_target))
print("MAE:", mean_absolute_error(test_pred, test_target))
print("F1:", f1_score(test_pred, test_target))