In [None]:
%reload_ext autoreload
%matplotlib inline
%autoreload 2

# Notebook explaining how to train model on market data

As exaplained in the README, first start up Kafka and related contaniers with `docker-compose up` and then run the services for gathering data. Once that is done, you can then download the data from Kafka and perform offline analysis (i.e. model training). As the project evolves, the goal is having the model being trained continuously and being re-deployed automatically. 

## Download data from Kafka
For now, I'll use market data only

In [2]:
from kryptoflow.models.streamer_base import AvroAsync

a = AvroAsync(topic='gdax')
a.read_from_start(persist=True, path='/media/carlo/HDD/kafka_local/')

## Transform data into suitable format and train with Keras 

In [8]:
from kryptoflow.analysis.dataset import get_data, ForecastTransformer, TimeEmbedder
import pandas
import numpy
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
from kryptoflow.analysis.model import KerasModel
from kryptoflow.analysis.export import ModelExporter


local_df = get_data('gdax', remote=False, keep_keys=['ts', 'price', 'volume_24h', 'spread', 'side'])
remote_df = get_data('gdax', remote=True, keep_keys=['ts', 'price', 'volume_24h', 'spread', 'side'])

# exp = ModelExporter()

pipe = Pipeline([
    ('tr', ForecastTransformer(prediction_steps=1)),
    ('sc', MinMaxScaler()),
    ('time', TimeEmbedder(inital_dims=len(local_df.columns)))
])

X, y = pipe.fit_transform(local_df)
# exp.store(pipe, 'pipeline', model_type='sklearn')

{'price': 7890.0, 'volume_24h': 9903.30859375, 'spread': 0.009999999776482582, 'ts': '2018-03-29 02:31:31', 'side': 'sell'}
{'price': 7890.0, 'volume_24h': 9903.3671875, 'spread': 0.009999999776482582, 'ts': '2018-03-29 02:31:33', 'side': 'sell'}
{'price': 7890.0, 'volume_24h': 9903.3681640625, 'spread': 0.009999999776482582, 'ts': '2018-03-29 02:31:33', 'side': 'sell'}
{'price': 7890.0, 'volume_24h': 9903.3681640625, 'spread': 1.5099999904632568, 'ts': '2018-03-29 02:31:34', 'side': 'sell'}
{'price': 7888.08984375, 'volume_24h': 9903.369140625, 'spread': 2.009999990463257, 'ts': '2018-03-29 02:31:34', 'side': 'sell'}
{'price': 7885.0, 'volume_24h': 9904.306640625, 'spread': 5.010000228881836, 'ts': '2018-03-29 02:31:34', 'side': 'sell'}
{'price': 7885.0, 'volume_24h': 9904.3076171875, 'spread': 0.009999999776482582, 'ts': '2018-03-29 02:31:39', 'side': 'sell'}
{'price': 7885.0, 'volume_24h': 9904.537109375, 'spread': 1.9900000095367432, 'ts': '2018-03-29 02:31:40', 'side': 'sell'}
{'p

In [2]:
pipe.named_steps['tr'].transform(remote_df)
# remote_df

array([[7.89339990e+03, 4.36000013e+00, 9.94000977e+03, 0.00000000e+00,
        0.00000000e+00, 1.00000000e+00, 7.89304004e+03, 6.28999996e+00,
        9.94001074e+03, 0.00000000e+00, 0.00000000e+00, 1.00000000e+00,
        7.89110986e+03, 6.28999996e+00, 9.94001465e+03, 0.00000000e+00,
        0.00000000e+00, 1.00000000e+00, 7.89058008e+03, 9.99999978e-03,
        9.94001660e+03, 2.00000000e+00, 0.00000000e+00, 1.00000000e+00,
        7.89058008e+03, 9.99999978e-03, 9.94002051e+03, 0.00000000e+00,
        0.00000000e+00, 1.00000000e+00, 7.89727002e+03, 6.67000008e+00,
        9.94003516e+03, 1.00000000e+00, 1.00000000e+00, 0.00000000e+00,
        7.89004004e+03, 9.99999978e-03, 9.94004785e+03, 6.00000000e+00,
        0.00000000e+00, 1.00000000e+00, 7.89004980e+03, 9.99999978e-03,
        9.94006738e+03, 6.00000000e+00, 1.00000000e+00, 0.00000000e+00,
        1.00000000e+00]])

In [8]:
from sklearn.externals import joblib
% time joblib.load('/home/carlo/projects/kryptoflow/stored_models/8/sklearn/pipeline.mdl')

CPU times: user 2.75 ms, sys: 0 ns, total: 2.75 ms
Wall time: 2.2 ms


Pipeline(memory=None,
     steps=[('tr', ForecastTransformer(n_time_steps=None, prediction_steps=None)), ('sc', MinMaxScaler(copy=True, feature_range=(0, 1))), ('time', TimeEmbedder(inital_dims=None))])

In [9]:
y[0]

0.9881294722414049

In [10]:
def inv_transf(x):
    x -= pipe.named_steps['sc'].min_[-1]
    x /= pipe.named_steps['sc'].scale_[-1]
    return x
inv_transf(y[0])

9153.8896484375

In [6]:
X, x_test, y, y_test = train_test_split(X, y)
model = KerasModel(dims=X.shape[1:])
model.fit(X, y, x_test, y_test, epochs=2)
exp.store(model.model, 'lstm', model_type='keras')

Train on 19377 samples, validate on 6459 samples
Epoch 1/2
 - 5s - loss: 0.0118 - val_loss: 3.1284e-05
Epoch 2/2
 - 5s - loss: 2.3012e-05 - val_loss: 2.1331e-05
Saved Keras model to disk
INFO:tensorflow:No assets to save.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: b'/home/carlo/projects/kryptoflow/kryptoflow/../stored_models/8/tf/saved_model.pb'
Saved tf model to disk
