Downloading information from available resources and trains an RNN model

In [25]:
import nuclio 
import mlrun
import os

In [16]:
%nuclio env -c V3IO_ACCESS_KEY=${V3IO_ACCESS_KEY}
%nuclio env -c V3IO_USERNAME=${V3IO_USERNAME}
%nuclio env -c V3IO_API=${V3IO_API}

In [None]:
%%nuclio cmd -c
pip install tensorflow==2.2.0

In [None]:
%%nuclio config 
kind = "nuclio"
spec.build.baseImage = "mlrun/mlrun:0.6.5"

In [None]:
# nuclio: start-code

In [1]:
import mlrun.feature_store as fs
import mlrun
import datetime
from mlrun import MLClientCtx
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense, LSTM, Dropout
import nuclio

In [2]:
def get_data_from_vector(context,ticker):
    last = context.last_trained_times.get(ticker)
    if not last:
        last = datetime.datetime.now()-datetime.timedelta(8)
    # change to read only data from the correct time and correct ticker
    data = fs.get_offline_features(context.stocks_vec).to_dataframe()
    
    # after changing to correct reading, remove all this section
    data = data[data["symbol"] == ticker]
    data = data.sort_values(by="Datetime",ascending=True)
#     context.logger.info(f"data is : {data}")
    dates = data["Datetime"]
    context.logger.info(f"timestamps : {dates}, and columns :{data.columns}")
    return data

In [3]:
def get_model(X_train):
    regressor = Sequential()
    regressor.add(LSTM(units = 50, return_sequences = True, input_shape = (X_train.shape[1], X_train.shape[2])))
    regressor.add(Dropout(0.2))
    regressor.add(LSTM(units = 50, return_sequences = True))
    regressor.add(Dropout(0.2))
    regressor.add(LSTM(units = 50, return_sequences = True))
    regressor.add(Dropout(0.2))
    regressor.add(LSTM(units = 50))
    regressor.add(Dropout(0.2))
    regressor.add(Dense(units =  1))
    
    #Compiling and fitting the model
    regressor.compile(optimizer = 'adam', loss = 'mean_squared_error')
    return regressor

In [4]:
def modify_data(context,ticker_data):
    ticker_to_int = {'GOOGL' : 0,'MSFT' : 1,'AMZN' : 2,'AAPL' : 3,'INTC' : 4}
    ticker_data["symbol"] = ticker_data["symbol"].apply(lambda x: ticker_to_int.get(x))
    for col in [x for x in ticker_data.columns if "Open" in x or "Close" in x or "High" in x or "Low" in x]:
        ticker_data[[col]] = context.priceMMS.fit_transform(ticker_data[[col]])
    for col in [x for x in ticker_data.columns if "Volume" in x]:
        ticker_data[[col]] = context.volumeMMS.fit_transform(ticker_data[[col]])
    for col in [x for x in ticker_data.columns if "Sentiment" in x]:
        ticker_data[[col]] = context.sentimentMMS.fit_transform(ticker_data[[col]])
        
    X_train = []
    y_train = []
    size_of_stamps = 10
    data = ticker_data.values
    closing = ticker_data["Close"].values
    for i in range(size_of_stamps, data.shape[0]):
        X_train.append(data[i-size_of_stamps:i])
        y_train.append(closing[i])
        
    X_train = np.asarray(X_train).astype('float32')
    y_train = np.asarray(y_train).astype('float32')
    
    where_are_NaNs = np.isnan(X_train)
    X_train[where_are_NaNs] = 0
    return X_train,y_train

In [6]:
def handler(context,event):   
    model_path = os.getenv('model_path', 'mymodel.h5')
    all_data = pd.DataFrame()
    context.logger.info("getting data from FS")
    all_data = fs.get_offline_features(context.stocks_vec).to_dataframe()
    context.logger.info("updating times ...")
    for ticker in context.sym_to_url.keys():
        ticker_data = all_data[all_data["symbol"] == ticker]
        max_date = max(ticker_data["Datetime"])
        context.last_trained_times[ticker] = max_date
        context.logger.info(f"ticker {ticker} max date is : {max_date}")
    all_data = all_data.drop(["Datetime","Content","Link"], axis=1)
    context.logger.info("modifieing data")
    X_train,y_train = modify_data(context,all_data) 
    context.logger.info("Finished modifieing data")
    
    if os.path.exists(model_path):
        context.logger.info("Previously trained model loaded")
        model = load_model(model_path)
    else:
        context.logger.info("New model created")
        model = get_model(X_train)
        
    model.fit(X_train, y_train, epochs = 2, batch_size = 512)
    context.logger.info("Done training")
    model.save(model_path)
    context.logger.info(f"model saved at {model_path}")
    return context.PROJECT_NAME

In [7]:
def init_context(context):
    context.logger.info("Initalizing context")
    setattr(context, 'PROJECT_NAME', os.getenv('PROJECT_NAME', 'stocks-' + os.getenv('V3IO_USERNAME')))
    mlrun.set_environment(project=context.PROJECT_NAME)
    
    last_trained_times = {}
    setattr(context, 'last_trained_times', last_trained_times)

    sym_to_url = {'GOOGL': 'google-inc', 'MSFT': 'microsoft-corp', 'AMZN': 'amazon-com-inc',
                'AAPL': 'apple-computer-inc', 'INTC' : 'intel-corp'}
    
    setattr(context, 'sym_to_url',  os.getenv('sym_to_url',sym_to_url))
        

    setattr(context,'stocks_vec', os.getenv('stocks_vec',"stocks-vec"))
    
    setattr(context,"priceMMS",MinMaxScaler(feature_range = (0, 1)))
    setattr(context,"volumeMMS",MinMaxScaler(feature_range = (0, 1)))
    setattr(context,"sentimentMMS",MinMaxScaler(feature_range = (0, 1)))
    context.logger.info("Finished Initalizing context")

In [8]:
# nuclio: end-code

In [None]:
mlrun.set_environment(project="stocks-" + os.getenv('V3IO_USERNAME'))

In [None]:
init_context(context)
event = ""
s = handler(context,event)

In [11]:
# test remote deployment
from mlrun import code_to_function,auto_mount
import os 

fn = code_to_function('rnn_model_training',
                      handler='handler')

fn.apply(auto_mount())

# Set parameters for current deployment
fn.set_envs({'PROJECT_NAME' : "stocks-" + os.getenv('V3IO_USERNAME'),
             'model_path' : '/User/test/demos/stock-analysis/models/mymodel.h5'}) # make sure proper path is set
fn.spec.max_replicas = 2

In [None]:
addr = fn.deploy(project="stocks-" + os.getenv('V3IO_USERNAME'))

In [None]:
!curl {addr}