## Overview

CNNPred is code that is based on the paper [CNNPred:CNN-based stock market prediction using several data sources](https://arxiv.org/pdf/1810.08923.pdf). The code implementents two versions which are CNNPred2 and CNNPred3 which use 2D and 3D input datasets.  The 2D data is time history and features for a particular time series. The 3D data is time history and features for multiple time series.  Both version produce a prediction for a particular time series which could be a market index or the price of an equity.  This is the [blog post](https://machinelearningmastery.com/using-cnn-for-financial-time-series-prediction/) that discusses the implmentation.




### Features

The following table shows the features used in CNNPred2 and CNNPred3. Note that there is a lot of economic vartiable used from FRED which we could also use. [TA-Lib](https://ta-lib.org/) is the technical indicators library they use and is one of the more popular ones boasting perhaps the largest number of features. [pandas-ta](https://github.com/twopirllc/pandas-ta) is another example and is a newer version that is designed around using pandas.


![alt text](./img/features-1of2.jpg "Features used in CNNPred 1 of 2")

![alt text](./img/features-2of2.jpg "Features used in CNNPred 2 of 2")

## Setup

In [1]:
# If you are running on a mac, make sure that you have the latest version of the OS, Xcode, and pip.
# You will probably need to install a fortran compiler for the compilation of scipy and utilities like
# cmake.  Install these using brew.

# %pip install tensorflow-macos
# %pip install scikit-learn

In [2]:
import os
import random

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPool2D, Input
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, mean_absolute_error
import warnings


In [3]:
DATADIR = "./data-2d"
TRAIN_TEST_CUTOFF = '2022-05-10'
TRAIN_VALID_RATIO = 0.75

# DATADIR = "./Dataset"
# TRAIN_TEST_CUTOFF = '2023-05-21'
# TRAIN_VALID_RATIO = 0.75


## Evaluation Metrics

These are the evaluation metrics that are designed to be used with batches of records.

In [4]:
# https://datascience.stackexchange.com/questions/45165/how-to-get-accuracy-f1-precision-and-recall-for-a-keras-model
# to implement F1 score for validation in a batch
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

def f1macro(y_true, y_pred):
    f_pos = f1_m(y_true, y_pred)
    # negative version of the data and prediction
    f_neg = f1_m(1-y_true, 1-K.clip(y_pred,0,1))
    return (f_pos + f_neg)/2


## CNNPred3

This is the implementation of CNNPred3 which uses features and multiple equities as time series as input.

In [5]:
def cnnpred_3d(seq_len=60, n_stocks=5, n_features=82, n_filters=(8,8,8), droprate=0.1):
    "3D-CNNpred model according to the paper"
    model = Sequential([
        Input(shape=(n_stocks, seq_len, n_features)),
        Conv2D(n_filters[0], kernel_size=(1,1), activation="relu", data_format="channels_last"),
        Conv2D(n_filters[1], kernel_size=(n_stocks,3), activation="relu"),
        MaxPool2D(pool_size=(1,2)),
        Conv2D(n_filters[2], kernel_size=(1,3), activation="relu"),
        MaxPool2D(pool_size=(1,2)),
        Flatten(),
        Dropout(droprate),
        Dense(1, activation="sigmoid")
    ])
    return model

In [6]:

def datagen(data, seq_len, batch_size, target_index, targetcol, kind):
    "As a generator to produce samples for Keras model"
    # Learn about the data's features and time axis
    input_cols = [c for c in data.columns if c[0] != targetcol]
    tickers = sorted(set(c for _,c in input_cols))
    n_features = len(input_cols) // len(tickers)
    index = data.index[data.index < TRAIN_TEST_CUTOFF]
    split = int(len(index) * TRAIN_VALID_RATIO)
    assert split > seq_len, "Training data too small for sequence length {}".format(seq_len)
    if kind == "train":
        index = index[:split]   # range for the training set
    elif kind == 'valid':
        index = index[split:]   # range for the validation set
    else:
        raise NotImplementedError
    # Infinite loop to generate a batch
    batch = []
    while True:
        # Pick one position, then clip a sequence length
        while True:
            t = random.choice(index)
            n = (data.index == t).argmax()
            if n-seq_len+1 < 0:
                continue # this sample is not enough for one sequence length
            frame = data.iloc[n-seq_len+1:n+1][input_cols]
            # convert frame with two level of indices into 3D array
            shape = (len(tickers), len(frame), n_features)
            X = np.full(shape, np.nan)
            for i,ticker in enumerate(tickers):
                X[i] = frame.xs(ticker, axis=1, level=1).values
            batch.append([X, data[targetcol][target_index][t]])
            break
        # if we get enough for a batch, dispatch
        if len(batch) == batch_size:
            X, y = zip(*batch)
            yield np.array(X), np.array(y)
            batch = []

def testgen(data, seq_len, target_index, targetcol):
    "Return array of all test samples"
    input_cols = [c for c in data.columns if c[0] != targetcol]
    tickers = sorted(set(c for _,c in input_cols))
    n_features = len(input_cols) // len(tickers)
    t = data.index[data.index >= TRAIN_TEST_CUTOFF][0]
    n = (data.index == t).argmax()
    batch = []
    for i in range(n+1, len(data)+1):
        # Clip a window of seq_len ends at row position i-1
        frame = data.iloc[i-seq_len:i]
        target = frame[targetcol][target_index][-1]
        frame = frame[input_cols]
        # convert frame with two level of indices into 3D array
        shape = (len(tickers), len(frame), n_features)
        X = np.full(shape, np.nan)
        for i,ticker in enumerate(tickers):
            X[i] = frame.xs(ticker, axis=1, level=1).values
        batch.append([X, target])
    X, y = zip(*batch)
    return np.array(X), np.array(y)

# Read data into pandas DataFrames
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=FutureWarning)

    data = {}
    for filename in os.listdir(DATADIR):
        if not filename.lower().endswith(".csv"):
            continue # read only the CSV files
        filepath = os.path.join(DATADIR, filename)
        X = pd.read_csv(filepath, index_col="Date", parse_dates=True)
        # basic preprocessing: get the name, the classification
        # Save the target variable as a column in dataframe for easier dropna()
        name = X["Name"][0]
        del X["Name"]
        cols = X.columns
        X["Target"] = (X["Close"].pct_change().shift(-1) > 0).astype(int)
        X.dropna(inplace=True)
        # Fit the standard scaler using the training dataset
        index = X.index[X.index < TRAIN_TEST_CUTOFF]
        index = index[:int(len(index) * TRAIN_VALID_RATIO)]
        scaler = StandardScaler().fit(X.loc[index, cols])
        # Save scale transformed dataframe
        X[cols] = scaler.transform(X[cols])
        data[name] = X

    # Transform data into 3D dataframe (multilevel columns)
    for key, df in data.items():
        df.columns = pd.MultiIndex.from_product([df.columns, [key]])
    data = pd.concat(data.values(), axis=1)


In [7]:
data.head()

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,MOM_2,MOM_4,MOM_6,SMA_30,SMA_60,SMA_90,rsi_14,cmf_20,ad,%K_14_3_3,%D_14_3_3,Target
Unnamed: 0_level_1,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA,NVDA
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2,Unnamed: 15_level_2,Unnamed: 16_level_2,Unnamed: 17_level_2,Unnamed: 18_level_2
2014-06-04,-1.182193,-1.182052,-1.179806,-1.181625,-1.183027,-1.04182,-0.074538,-0.097093,-0.102152,-1.17581,-1.163081,-1.154835,0.039569,0.581578,-1.422279,0.610526,0.772993,1
2014-06-05,-1.181076,-1.182052,-1.179806,-1.180731,-1.182172,-0.948756,-0.048438,-0.093727,-0.109434,-1.175859,-1.162949,-1.154382,0.160668,0.663325,-1.417175,0.622303,0.682274,1
2014-06-06,-1.179512,-1.180074,-1.17776,-1.179949,-1.181425,-1.042444,-0.040282,-0.07914,-0.100331,-1.175947,-1.162813,-1.153939,0.267063,0.505318,-1.422402,0.724238,0.671269,1
2014-06-09,-1.179959,-1.179744,-1.178896,-1.179726,-1.181211,-0.749613,-0.050069,-0.06792,-0.103062,-1.175825,-1.162562,-1.153474,0.298405,-0.030103,-1.422823,0.792657,0.733761,1
2014-06-10,-1.180294,-1.179634,-1.178669,-1.178609,-1.180143,-0.850771,-0.045176,-0.058943,-0.088498,-1.175636,-1.162307,-1.15303,0.456965,-0.264233,-1.41673,0.895476,0.827491,1


In [8]:

seq_len = 60
batch_size = 128
n_epochs = 3 # 20
n_features = 82
n_stocks = 5

# Produce CNNpred as a binary classification problem
model = cnnpred_3d(seq_len, n_stocks, n_features)
model.compile(optimizer="adam", loss="mae", metrics=["acc", f1macro])
model.summary() # print model structure to console

# Set up callbacks and fit the model
# We use custom validation score f1macro() and hence monitor for "val_f1macro"
# Using the ".keras" extension since the ".h5" causes and error.
# https://keras.io/api/callbacks/model_checkpoint/
checkpoint_path = "./cp3d-{epoch}-{val_f1macro:.2f}.keras"
callbacks = [
    ModelCheckpoint(checkpoint_path,
                    monitor='val_f1macro', mode="max",
                    verbose=0, save_best_only=True, save_weights_only=False, save_freq="epoch")
]

model.fit(datagen(data, seq_len, batch_size, "DJI", "Target", "train"),
          validation_data=datagen(data, seq_len, batch_size, "DJI", "Target", "valid"),
          epochs=n_epochs, steps_per_epoch=400, validation_steps=10, verbose=1, callbacks=callbacks)



Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 5, 60, 8)          664       
                                                                 
 conv2d_1 (Conv2D)           (None, 1, 58, 8)          968       
                                                                 
 max_pooling2d (MaxPooling2  (None, 1, 29, 8)          0         
 D)                                                              
                                                                 
 conv2d_2 (Conv2D)           (None, 1, 27, 8)          200       
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 1, 13, 8)          0         
 g2D)                                                            
                                                                 
 flatten (Flatten)           (None, 104)               0

KeyError: 'DJI'

In [None]:
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=FutureWarning)

    # Prepare test data
    test_data, test_target = testgen(data, seq_len, "DJI", "Target")

    # Test the model
    test_out = model.predict(test_data)
    test_pred = (test_out > 0.5).astype(int)
    print("accuracy:", accuracy_score(test_pred, test_target))
    print("MAE:", mean_absolute_error(test_pred, test_target))
    print("F1:", f1_score(test_pred, test_target))


## CNNPred2

This is the implementation for CNNPred2 which uses only one stock and associated features as a time series as input to the model.  The paper uses a 60/20/20 split for the training, validation and evaluation (test) data.

In [None]:
def cnnpred_2d(seq_len=60, n_features=82, n_filters=(8,8,8), droprate=0.1):
    "2D-CNNpred model according to the paper"
    model = Sequential([
        Input(shape=(seq_len, n_features, 1)),
        Conv2D(n_filters[0], kernel_size=(1, n_features), activation="relu"),
        Conv2D(n_filters[1], kernel_size=(3,1), activation="relu"),
        MaxPool2D(pool_size=(2,1)),
        Conv2D(n_filters[2], kernel_size=(3,1), activation="relu"),
        MaxPool2D(pool_size=(2,1)),
        Flatten(),
        Dropout(droprate),
        Dense(1, activation="sigmoid")
    ])
    return model


### Dataset generation
Data is generated by randomly selecting a single stock for testing.  The evaluation is performed across all the datasets however.

In [None]:


def datagen(data, seq_len, batch_size, targetcol, kind):
    "As a generator to produce samples for Keras model"
    batch = []
    while True:
        # Pick one dataframe from the pool
        key = random.choice(list(data.keys()))
        df = data[key]
        input_cols = [c for c in df.columns if c != targetcol]
        index = df.index[df.index < TRAIN_TEST_CUTOFF]
        split = int(len(index) * TRAIN_VALID_RATIO)
        assert split > seq_len, "Training data too small for sequence length {}".format(seq_len)
        if kind == 'train':
            index = index[:split]   # range for the training set
        elif kind == 'valid':
            index = index[split:]   # range for the validation set
        else:
            raise NotImplementedError
        # Pick one position, then clip a sequence length
        while True:
            t = random.choice(index)     # pick one time step
            n = (df.index == t).argmax() # find its position in the dataframe
            if n-seq_len+1 < 0:
                continue # this sample is not enough for one sequence length
            frame = df.iloc[n-seq_len+1:n+1]
            batch.append([frame[input_cols].values, df.loc[t, targetcol]])
            break
        # if we get enough for a batch, dispatch
        if len(batch) == batch_size:
            X, y = zip(*batch)
            X, y = np.expand_dims(np.array(X), 3), np.array(y)
            yield X, y
            batch = []

def testgen(data, seq_len, targetcol):
    "Return array of all test samples"
    batch = []
    for key, df in data.items():
        input_cols = [c for c in df.columns if c != targetcol]
        # find the start of test sample
        t = df.index[df.index >= TRAIN_TEST_CUTOFF][0]
        print(t)
        n = (df.index == t).argmax()
        # extract sample using a sliding window
        for i in range(n+1, len(df)+1):
            frame = df.iloc[i-seq_len:i]
            batch.append([frame[input_cols].values, frame[targetcol][-1]])
    X, y = zip(*batch)
    return np.expand_dims(np.array(X),3), np.array(y)


with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=FutureWarning)

    # Read data into pandas DataFrames.  This creates a dictionary of dataframes, one for each ticker. 
    # Each ticker is represented by a file in the datadir. 
    data = {}
    for filename in os.listdir(DATADIR):
        if not filename.lower().endswith(".csv"):
            continue # read only the CSV files
        filepath = os.path.join(DATADIR, filename)
        X = pd.read_csv(filepath, index_col="Date", parse_dates=True)
        # basic preprocessing: get the name, the classification
        # Save the target variable as a column in dataframe for easier dropna()
        name = X["Name"][0]
        del X["Name"]
        cols = X.columns
        X["Target"] = (X["Close"].pct_change().shift(-1) > 0).astype(int)
        X.dropna(inplace=True)
        # Fit the standard scaler using the training dataset
        index = X.index[X.index < TRAIN_TEST_CUTOFF]
        index = index[:int(len(index) * TRAIN_VALID_RATIO)]
        scaler = StandardScaler().fit(X.loc[index, cols])
        # Save scale transformed dataframe
        X[cols] = scaler.transform(X[cols])
        data[name] = X


In [None]:
data['NVDA'].count()

In [None]:
data['NVDA'].info()

In [None]:
data['NVDA'].head()
#data['DJI'].tail()

In [None]:
seq_len = 60
batch_size = 128
n_epochs = 3
n_features =17 # Per dataset (old 82)

# Produce CNNpred as a binary classification problem
model = cnnpred_2d(seq_len, n_features)
model.compile(optimizer="adam", loss="mae", metrics=["acc", f1macro])
model.summary()  # print model structure to console

# Set up callbacks and fit the model
# We use custom validation score f1macro() and hence monitor for "val_f1macro"
checkpoint_path = "./cp2d-{epoch}-{val_f1macro:.2f}.keras"
callbacks = [
    ModelCheckpoint(checkpoint_path,
                    monitor='val_f1macro', mode="max",
                    verbose=0, save_best_only=True, save_weights_only=False, save_freq="epoch")
]
model.fit(datagen(data, seq_len, batch_size, "Target", "train"),
          validation_data=datagen(data, seq_len, batch_size, "Target", "valid"),
          epochs=n_epochs, steps_per_epoch=400, validation_steps=10, verbose=1, callbacks=callbacks)



In [None]:
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=FutureWarning)

    # Prepare test data
    test_data, test_target = testgen(data, seq_len, "Target")

    # Test the model
    test_out = model.predict(test_data)
    test_pred = (test_out > 0.5).astype(int)
    print("accuracy:", accuracy_score(test_pred, test_target))
    print("MAE:", mean_absolute_error(test_pred, test_target))
    print("F1:", f1_score(test_pred, test_target))

In [None]:
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=FutureWarning)
    test_data, test_target = testgen(data, seq_len, "Target")
    print(type(test_data))
    print(type(data))
    print(data['NVDA'].shape)
    print(data['NVDA'].head(1))