# Stock purchase recommendations with Machine Learning - Model Training

In [13]:
import pandas as pd
import numpy as np
#import talib as ta
import matplotlib.pyplot as plt
from tqdm import tqdm, tqdm_notebook # progress bar
import fastparquet
import pickle

from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.metrics import fbeta_score
from sklearn.metrics import make_scorer
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import TimeSeriesSplit
from sklearn.model_selection import RandomizedSearchCV
from sklearn.preprocessing import StandardScaler

from sklearn.ensemble import RandomForestClassifier

from dask.distributed import Client, progress
import dask_ml.model_selection as dcv

from sklearn.externals import joblib

In [None]:
pd.set_option('display.max_columns', 1500)

## 1) Load Training and Test Data

In [14]:
# load the training and test datae from feature engineering step:
X_train = fastparquet.ParquetFile('../data/processed/X_train.parq').to_pandas()
X_test = fastparquet.ParquetFile('../data/processed/X_test.parq').to_pandas()
y_train = pickle.load(open('../data/processed/y_train.pkl', 'rb'))
y_test = pickle.load(open('../data/processed/y_test.pkl', 'rb'))

X_train.shape, X_test.shape, y_train.shape, y_test.shape

((7024, 693), (1756, 693), (7024,), (1756,))

In [3]:
# quick inspection
X_train.tail()

Unnamed: 0_level_0,AdjVolume_-19,AdjVolume_-18,AdjVolume_-17,AdjVolume_-16,AdjVolume_-15,AdjVolume_-14,AdjVolume_-13,AdjVolume_-12,AdjVolume_-11,AdjVolume_-10,...,weekday,day,AAPL.US,ABBV.US,AMZN.US,CSCO.US,GE.US,INTC.US,MSFT.US,NFLX.US
Index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2017-05-10,1.193791,0.759857,0.81696,0.927417,0.791144,1.113186,0.810204,1.3886,0.931141,0.965937,...,2,10,0,1,0,0,0,0,0,0
2017-05-10,1.479636,0.94369,0.695273,0.775977,0.951766,0.693312,0.806673,0.893978,0.861651,0.904349,...,2,10,0,0,0,1,0,0,0,0
2017-05-10,0.79274,0.694295,0.64596,0.572547,0.675032,0.90842,0.674742,0.666782,0.709628,0.76408,...,2,10,1,0,0,0,0,0,0,0
2017-05-10,1.178886,1.523721,1.370172,1.43956,1.377645,1.350862,1.161777,1.496622,1.616429,1.229982,...,2,10,0,0,1,0,0,0,0,0
2017-05-10,0.958888,1.003052,0.93539,0.849433,1.512875,1.249828,1.82281,1.665799,1.680479,1.431698,...,2,10,0,0,0,0,0,0,1,0


In [4]:
X_test.head()

Unnamed: 0_level_0,AdjVolume_-19,AdjVolume_-18,AdjVolume_-17,AdjVolume_-16,AdjVolume_-15,AdjVolume_-14,AdjVolume_-13,AdjVolume_-12,AdjVolume_-11,AdjVolume_-10,...,weekday,day,AAPL.US,ABBV.US,AMZN.US,CSCO.US,GE.US,INTC.US,MSFT.US,NFLX.US
Index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2017-05-11,0.639493,0.596356,0.541554,0.96453,0.796825,1.162128,1.062026,1.071385,0.912776,1.151286,...,3,11,0,0,0,0,0,0,1,0
2017-05-11,0.741068,0.590482,0.579214,0.780116,1.079741,0.958561,1.222818,1.029151,1.260323,1.71299,...,3,11,0,0,0,0,0,1,0,0
2017-05-11,0.665995,2.95699,3.554402,2.239698,1.309693,1.233189,0.866047,3.718421,1.661812,1.381076,...,3,11,0,0,0,0,0,0,0,1
2017-05-11,0.991471,0.730477,0.815267,0.999956,0.728416,0.847517,0.939242,0.905279,0.950139,1.596598,...,3,11,0,0,0,1,0,0,0,0
2017-05-11,0.696296,0.647822,0.574197,0.676977,0.911038,0.676686,0.668704,0.711673,0.766282,0.544953,...,3,11,1,0,0,0,0,0,0,0


In [5]:
y_train[:5]

Index
2013-11-13    False
2013-11-13    False
2013-11-13    False
2013-11-13     True
2013-11-13     True
Name: setup_for_profitable_trade, dtype: bool

## 2) Build Initial Model - RandomForestClassifier

Start the ML process with a simple out of the box RandomForestClassifier to get a base line and validate that the training data is functioning with sklearn and can generate some predictions.

In [6]:
# define a simple pipeline
pipeline = Pipeline([
    ('standardScaler', StandardScaler()),
    ('randomForest', RandomForestClassifier())
])

# inspect parameters
pipeline.get_params()

{'memory': None,
 'randomForest': RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
             max_depth=None, max_features='auto', max_leaf_nodes=None,
             min_impurity_decrease=0.0, min_impurity_split=None,
             min_samples_leaf=1, min_samples_split=2,
             min_weight_fraction_leaf=0.0, n_estimators='warn', n_jobs=None,
             oob_score=False, random_state=None, verbose=0,
             warm_start=False),
 'randomForest__bootstrap': True,
 'randomForest__class_weight': None,
 'randomForest__criterion': 'gini',
 'randomForest__max_depth': None,
 'randomForest__max_features': 'auto',
 'randomForest__max_leaf_nodes': None,
 'randomForest__min_impurity_decrease': 0.0,
 'randomForest__min_impurity_split': None,
 'randomForest__min_samples_leaf': 1,
 'randomForest__min_samples_split': 2,
 'randomForest__min_weight_fraction_leaf': 0.0,
 'randomForest__n_estimators': 'warn',
 'randomForest__n_jobs': None,
 'randomForest__oob_score': F

In [None]:
%%timeit -r1 -n1

# fit the pipeline with all default parameters
pipeline.fit(X_train, y_train.reset_index().setup_for_profitable_trade)

In [None]:
# create prediction
y_pred_firstRF = pipeline.predict(X_test)

# save for backtesting in separate notebook
pickle.dump(y_pred_firstRF, open('../data/model_predictions/y_pred_firstRF.pkl', 'wb'))

We now have a first prediction on the dataset - let's look into the performance of the default settings. To understand the result, we will look at a few different metrics: the accuracy, precision, recall, f1-score, and the confusion matrix.

It is important to note that our data is imbalanced (not the same number of days with trade signal as with no signal). Therefore, F scores are a better metric than ROC-AUC or accuracy. We can not easily fix the imbalance since we are looking at time series data and resampling might introduce lookahead bias.

The ultimate test of the quality of the prediction is to backtest the results (ie simulate financial performance based on the predictions). This will be done in the next workbook. For now let's inspect some of the basic metrics.

In [None]:
print(classification_report(y_test, y_pred_firstRF))

In [None]:
accuracy_score(y_test, y_pred_firstRF)

In [None]:
confusion_matrix(y_test, y_pred_firstRF, labels=[False, True])

The confusion matrix indicates a large number of False Positives (bad because if we used those to trade, we would enter a trade that turns out not to be as profitable as desired) and False Negatives (bad because they mean missed opportunities for us to enter profitable trades).

The good news is that our out-of-the-box model was able to predict some True Positives but the results in terms of financial performance are most likely very bad (we saved off the prediction and will look at this later).

Even though the model seems to perform poorly, we might be able to learn something from it:

Let's look into the relative feature importances to see if anything stands out

In [7]:
def print_feature_importances(estimator):

    importances = estimator.feature_importances_
    indices = np.argsort(importances)[::-1]

    # Print the feature ranking
    print("Feature ranking:")

    for f in range(X_train.shape[1]):
        print(str(f + 1) + " importance: " + str(importances[indices[f]]) + ". feature name: " + X_train.columns[indices[f]])

In [None]:
      print_feature_importances(pipeline.named_steps['randomForest'])

At the very bottom of the list are the RSI_above_80 and RSI_below_20 indicators. RSI is an indicator of extremes so having these features not play an important role means that the model did not notice many extremes and having predictive power.
The top section contains many of the SMA50 to SMA200 ratios. The SMA 50 and 200 crossovers or distances are classic momentum indicators (SMA 50 over SMA 200 = uptrend, below = downtrend).
Since the time period in the training data is that of a general market uptrend, it sounds intuitively reasonable that momentum indicators are important.

## 3) Improve RandomForest model with GridSearch

Given the imbalance of our data and the desire to avoid False Positives, we will use a modified F1 score that places stronger emphasis on precision than on recall (they have the same weight in the F1 score). We will skew it towards precision by setting beta to 0.5 and towards recall bysetting beta to 2.0. The combinations were used in separate runs of GridSearchCV and results were saved.

In [8]:
# place higher focus on precision (ie getting most TP and minimize FP) than on recall (ie minimize FN)
# since placing a trade that was based on a FP will be costly and hurts more than missing a trade due to a FN
fhalf_scorer = make_scorer(fbeta_score, beta=0.5) # low beta favors precision over recall
ftwo_scorer = make_scorer(fbeta_score, beta=2) # high beta favors recall over precision

In [48]:
import socket

def get_ip():
    '''
    Get IP address of main interface
    from https://stackoverflow.com/a/28950776
    '''
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't even have to be reachable
        s.connect(('10.255.255.255', 1))
        IP = s.getsockname()[0]
    except:
        IP = '127.0.0.1'
    finally:
        s.close()
    return IP

print(get_ip())

10.151.151.124


In [76]:
# establish cluster with scheduler and local workers

from dask.distributed import Client, LocalCluster, progress, Worker, Scheduler
cluster = LocalCluster(ip=get_ip(), # uses main IP address of machine instead of LocalHost - needed if we want external workers
                       scheduler_port=8786,
                       diagnostics_port=8787,
                       asynchronous=False)
print(cluster.scheduler.address)

OSError: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted

In [74]:
cluster.scheduler.address

'tcp://10.151.151.124:8786'

In [80]:
client = Client(cluster.scheduler.address)
client

0,1
Client  Scheduler: tcp://10.151.151.124:8786  Dashboard: http://10.151.151.124:8787/status,Cluster  Workers: 18  Cores: 18  Memory: 51.35 GB


In [69]:
extraworker = cluster.start_worker(ncores=1)
extraworker

<Nanny: tcp://10.151.151.124:57369, threads: 1>

In [71]:
cluster.stop_worker(extraworker)
client

0,1
Client  Scheduler: tcp://10.151.151.124:8786  Dashboard: http://10.151.151.124:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


### Docker:
run a worker in Docker:

docker run -dit --rm --network host -P --name workerToHost daskdev/dask dask-worker host.docker.internal:8786

(need to give correct port for scheduler).

connects succesfully, but doesn't seem to communicate properly due to windows networking limitations in docker ( see https://docs.docker.com/docker-for-windows/networking/)

### Jupyter on foreign machine:
in Jupyter notebook that imports same neede libraries, execute this as many times as workers are needed:

w = Worker('tcp://10.151.151.124:8786', ncores=1)
w.start()

In [19]:
client.get_versions(check=True)

ValueError: Mismatched versions found

dask_ml
+---------------------------+---------+
|                           | version |
+---------------------------+---------+
| client                    | 0.11.0  |
| tcp://10.151.151.59:57847 | 0.9.0   |
| tcp://10.151.151.59:57850 | 0.9.0   |
| tcp://10.151.151.59:57857 | 0.9.0   |
| tcp://10.151.151.59:57862 | 0.9.0   |
| tcp://10.151.151.59:57865 | 0.9.0   |
| tcp://10.151.151.59:57874 | 0.9.0   |
| tcp://10.151.151.59:58107 | 0.9.0   |
| tcp://10.151.151.59:58111 | 0.9.0   |
| tcp://10.151.151.59:58114 | 0.9.0   |
| tcp://10.151.151.59:58117 | 0.9.0   |
| tcp://10.151.151.59:58119 | 0.9.0   |
| tcp://10.151.151.59:58122 | 0.9.0   |
| tcp://10.151.151.59:58126 | 0.9.0   |
| tcp://10.151.151.59:58128 | 0.9.0   |
| tcp://10.151.151.59:58131 | 0.9.0   |
+---------------------------+---------+

In [72]:
client.scheduler_info()

{'address': 'tcp://10.151.151.124:8786',
 'id': 'Scheduler-38b29877-56a3-4dbe-b60f-dfb174427b54',
 'services': {'bokeh': 8787},
 'type': 'Scheduler',
 'workers': {}}

For a GridSearch, we will define a wide range of parameters to be used in the pipeline. The new fhalf_scorer will be used for optimization.

In [81]:

parameters = {
    'randomForest__min_samples_leaf': [2, 5, 10],
    'randomForest__n_estimators' : [10, 20, 50, 100],
    'randomForest__max_features': [5, 'sqrt', 'log2'], # log2 of 690 = 9, sqrt of 690 = 20
    'randomForest__max_depth' : [4, 5, 6, 7, 8],
    'randomForest__criterion' :['gini', 'entropy']
}
'''
parameters = {
    'randomForest__min_samples_leaf': [2, 5, 10],
    'randomForest__n_estimators' : [10, 20, 50, 100]
    }
'''
my_cv = TimeSeriesSplit(n_splits=3)
cv = dcv.GridSearchCV(pipeline, param_grid=parameters, cv=my_cv, scoring='f1', n_jobs=-1) # uses fhalf_scorer

#with joblib.parallel_backend('dask'):
cv.fit(X_train, y_train)

GridSearchCV(cache_cv=True,
       cv=TimeSeriesSplit(max_train_size=None, n_splits=3),
       error_score='raise',
       estimator=Pipeline(memory=None,
     steps=[('standardScaler', StandardScaler(copy=True, with_mean=True, with_std=True)), ('randomForest', RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
            max_depth=None, max_features='auto', max_leaf_nodes=None,
            min_impurity_decrease=0.0, min_impurity_...obs=None,
            oob_score=False, random_state=None, verbose=0,
            warm_start=False))]),
       iid=True, n_jobs=-1,
       param_grid={'randomForest__min_samples_leaf': [2, 5, 10], 'randomForest__max_depth': [4, 5, 6, 7, 8], 'randomForest__max_features': [5, 'sqrt', 'log2'], 'randomForest__criterion': ['gini', 'entropy'], 'randomForest__n_estimators': [10, 20, 50, 100]},
       refit=True, return_train_score='warn', scheduler=None, scoring='f1')

In [None]:
print(cv.best_params_)
pickle.dump(cv.best_params_, open('../models/GridSearch_vbetaone.pkl', 'wb'))

In [None]:
print_feature_importances(cv.best_estimator_.named_steps['randomForest'])

In [None]:
y_pred_GridSearch = cv.predict(X_test)

# save for backtesting in separate notebook
pickle.dump(y_pred_GridSearch, open('../data/model_predictions/y_pred_GridSearch_vbetaone.pkl', 'wb'))

In [None]:
print(classification_report(y_test, y_pred_GridSearch))

In [None]:
accuracy_score(y_test, y_pred_GridSearch)

In [None]:
confusion_matrix(y_test, y_pred_GridSearch, labels=[False, True])

## next: resume in backtesting notebook to understand and measure true performance of the model predictions