In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import os, itertools
os.environ['MPLCONFIGDIR'] = os.getcwd() + "/configs/"
import matplotlib.pyplot as plt
%matplotlib inline
from collections import Counter


In [None]:
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from keras.layers import Conv2D, MaxPooling2D, MaxPool2D
from keras.layers import Dense, Dropout, Activation, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import sparse_categorical_crossentropy

In [None]:
from tensorflow.config.experimental import list_physical_devices
gpu_devices = list_physical_devices('GPU')
if gpu_devices:
    print('Using GPU')
else:
    print('Using CPU')


In [None]:
IMAGE_WIDTH, IMAGE_HEIGHT = 50, 50

In [None]:
Xs_data = np.load('Xs.npy',allow_pickle=True)
ys_data = np.load('ys.npy',allow_pickle=True)
print("Xs:",Xs_data.shape)
print("ys:",ys_data.shape)
print("Class label count:",sorted(Counter(ys_data).items()))

In [None]:
# Reshape for MinMaxScaler and RandomUnderSampler 
Xs_data = Xs_data.reshape((len(Xs_data),(IMAGE_HEIGHT*IMAGE_WIDTH)))
ys_data = ys_data
print("Xs:",Xs_data.shape)
print("ys:",ys_data.shape)

In [None]:
''' RandomUnderSampler for classification imbalance of Hold labels'''
# !pip install -U imbalanced-learn
# from imblearn.under_sampling import RandomUnderSampler

# rus = RandomUnderSampler()
# Xs, ys = rus.fit_resample(Xs_data, ys_data)
# print("Xs:",Xs.shape)
# print("ys:",ys.shape)
# print("Class label count:",sorted(Counter(ys).items()))

In [None]:
Xs = Xs_data
ys = ys_data
''' Reshape again if needed before train test split'''
# Xs = Xs.reshape(-1,(IMAGE_HEIGHT),(IMAGE_WIDTH),1)
# ys = ys.reshape(-1,1)
# print("Xs:",Xs.shape)
# print("ys:",ys.shape)
X_train, X_test, y_train, y_test = train_test_split(Xs, ys, test_size=0.20, random_state=0, shuffle=True, stratify=ys)


In [None]:
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
print("X_train:",X_train.shape)
print("X_test:",X_test.shape)

# Logistic Regression Baseline

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.svm import SVC

logisticregression = LogisticRegression(C=1, multi_class='ovr', max_iter=1000).fit(X_train, y_train)#Evaluate Logistic Regression model:

print("training set score: %f" % logisticregression.score(X_train, y_train))
print("test set score: %f" % logisticregression.score(X_test, y_test))

In [None]:
''' Reshape back to Image format for CNN '''
X_train = X_train.reshape(-1,(IMAGE_HEIGHT),(IMAGE_WIDTH),1)
y_train = y_train.reshape(-1,1)
X_test = X_test.reshape(-1,(IMAGE_HEIGHT),(IMAGE_WIDTH),1)
y_test = y_test.reshape(-1,1)
print("X_train:",X_train.shape)
print("y_train:",y_train.shape)
print("X_test:",X_test.shape)
print("y_test:",y_test.shape)

# CNN Model

In [None]:
''' CNN model that is trained and tested on entire dataset '''
model = Sequential()#add model layers
model.add(Conv2D(32, (5,5), activation='relu', input_shape=(IMAGE_HEIGHT,IMAGE_WIDTH,1)))
model.add(Dropout(0.20))
model.add(Conv2D(64, (5,5), activation='relu'))
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.50))
model.add(Dense(3, activation='softmax'))
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
#train the model
history = model.fit(X_train, y_train, validation_data=(X_test, y_test), verbose=1, batch_size=100, epochs=100)


In [None]:
from sklearn.metrics import average_precision_score,precision_score,recall_score, f1_score
from sklearn import metrics
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import classification_report

results = model.evaluate(X_test,y_test,verbose=0)
loss, accuracy = results
print("\nTest Data Results\n------------------")
# print("Test Loss: %.2f%%" % (loss * 100))
print("Test Accuracy: %.2f%%" % (accuracy * 100))
predictions = model.predict(X_test,verbose=0)
classes = np.argmax(predictions, axis=1)
f1 = f1_score(y_test, classes, average='weighted')
print(f"\nF1 score: {f1}")
rc = recall_score(y_test, classes, average='weighted')
print(f"\nRecall score: {rc}")
pr = precision_score(y_test, classes, average='weighted')
print(f"\nPrecision score: {pr}")
cr = classification_report(y_test, classes,target_names=['Sell','Hold','Buy'] )
print(f"\n{cr}")

In [None]:
''' Plot confusion matrix '''
axlabels = ['Sell','Hold','Buy']
aylabels = ['Sell','Hold','Buy']
cf_matrix = confusion_matrix(y_test, classes, normalize='all')
s=sns.heatmap(cf_matrix/np.sum(cf_matrix), annot=True, fmt='.2%', cmap='Blues',xticklabels=axlabels, yticklabels=aylabels)
s.set_xlabel('Predicted Label', fontsize=14)
s.set_ylabel('True Label', fontsize=14)


In [None]:
''' Plot train and test accuracy vs epoch chart '''
sns.set(style="ticks", context="talk")
plt.style.use("dark_background")

plt.plot(history.history['accuracy'], color="yellow")
plt.plot(history.history['val_accuracy'], color="green")
plt.title('Model accuracy')
plt.ylabel('Accuracy %')
plt.xlabel('Epoch')
plt.legend(['train accuracy', 'test accuracy'], loc='upper left')
plt.show()

In [None]:
def save_model(file_name):
    ''' Save the model as an h5 file'''
    model.save(file_name + ".h5", overwrite=True)
    # model.save( os.path.join(path, "model.h5") )
    print("Saved model to disk")

# save_model('save_model')

# K FOLD CROSS VALIDATION

In [None]:
from sklearn.model_selection import KFold

input_train = X_train
input_test = X_test
target_train = y_train
target_test = y_test

# Model configuration
batch_size = 100
img_width, img_height, img_num_channels = IMAGE_HEIGHT, IMAGE_WIDTH, 1
loss_function = 'sparse_categorical_crossentropy'
no_classes = 3
no_epochs = 100
optimizer = 'Adam'
verbosity = 1
num_folds = 5

# Determine shape of the data
input_shape = (img_width, img_height, img_num_channels)

# Parse numbers as floats
input_train = input_train.astype('float32')
input_test = input_test.astype('float32')

# Define per-fold score containers 
acc_per_fold = []
loss_per_fold = []

# Merge inputs and targets
inputs = np.concatenate((input_train, input_test), axis=0)
targets = np.concatenate((target_train, target_test), axis=0)

# Define the K-fold Cross Validator
kfold = KFold(n_splits=num_folds, shuffle=True)

# K-fold Cross Validation model evaluation
fold_no = 1
for train, test in kfold.split(inputs, targets):
  # Define the model architecture
  model = Sequential()
  model.add(Conv2D(32, kernel_size=(5, 5), activation='relu', input_shape=input_shape))
  model.add(Dropout(0.20))
  model.add(Conv2D(64, kernel_size=(5, 5), activation='relu'))
  model.add(MaxPooling2D(pool_size=(2, 2)))
  model.add(Dropout(0.25))
  model.add(Flatten())
  model.add(Dense(128, activation='relu'))
  model.add(Dropout(0.50))
  model.add(Dense(no_classes, activation='softmax'))

  # Compile the model
  model.compile(loss=loss_function,
                optimizer=optimizer,
                metrics=['accuracy'])


  # Generate a print
  print('------------------------------------------------------------------------')
  print(f'Training for fold {fold_no} ...')

  # Fit data to model
  history = (model.fit(inputs[train], targets[train],
              batch_size=batch_size,
              epochs=no_epochs,
              verbose=verbosity))

  # Generate generalization metrics
  scores = model.evaluate(inputs[test], targets[test], verbose=0)
  print(f'Score for fold {fold_no}: {model.metrics_names[0]} of {scores[0]}; {model.metrics_names[1]} of {scores[1]*100}%')
  acc_per_fold.append(scores[1] * 100)
  loss_per_fold.append(scores[0])

  # Increase fold number
  fold_no = fold_no + 1



In [None]:
print('------------------------------------------------------------------------')
print('Score per fold')
for i in range(0, len(acc_per_fold)):
  print('------------------------------------------------------------------------')
  print(f'> Fold {i+1} - Loss: {loss_per_fold[i]} - Accuracy: {acc_per_fold[i]}%')
print('------------------------------------------------------------------------')
print('Average scores for all folds:')
print(f'> Accuracy: {np.mean(acc_per_fold)} (+- {np.std(acc_per_fold)})')
print(f'> Loss: {np.mean(loss_per_fold)}')

print('------------------------------------------------------------------------')