In [1]:
# Create a barrel for storing trained models
dbutils.fs.mkdirs("/fit_models/")

In [2]:
# Create a barrel for normalization minmax parameters
dbutils.fs.mkdirs("/scalers")

In [3]:
import pandas as pd
import requests
import numpy as np
from sklearn import preprocessing
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.layers.recurrent import LSTM
from keras.models import load_model
from keras import metrics
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, functions as f
from elephas.utils.rdd_utils import to_simple_rdd
from elephas.spark_model import SparkModel
from sklearn.externals import joblib

In [4]:
def read_data(spark, firm_list):
    file_location = "/FileStore/tables/nyse_prices.csv"
    df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").option("sep", ",").load(file_location)
    df = df.filter(df.symbol.isin(firm_list))
    df = df.withColumn("date", f.to_date(f.col("date")))
    pandas_df = df.toPandas()
    return pandas_df

In [5]:
def normalize_data(df, symbol, pred=False):
    matrix = df.to_numpy()
    scaler_filename = "/dbfs/scalers/" + symbol + ".save"
    
    if pred == False:
        scaler = preprocessing.MinMaxScaler()
        scaler.fit(matrix)
        joblib.dump(scaler, scaler_filename)
    else:
        scaler = joblib.load(scaler_filename)
    
    matrix = scaler.transform(df)
    df = pd.DataFrame(matrix, columns=df.columns)
    
    return df

In [6]:
def truncate_col(df):
    df.drop(['symbol', 'date'], axis=1, inplace=True)
    df.columns = ['open', 'target', 'low', 'high', 'volume']
    return df[['open', 'low', 'high', 'volume', 'target']]

In [7]:
def encapsulate(df, len_seq):
    data = df.to_numpy()
    num_features = len(df.columns)
    length_of_seq = len_seq + 1
    windows = []
    
    for index in range(len(data) - length_of_seq):
        windows.append(data[index : index + length_of_seq])
        
    windows = np.array(windows)
    
    N = round(0.9 * windows.shape[0])
    train = windows[:int(N), :]
    
    x_train = train[:, :-1]
    y_train = train[:, -1][:, -1]
    x_test = windows[int(N):, :-1]
    y_test = windows[int(N):, -1][:, -1]
    
    x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], num_features))
    x_test = np.reshape(x_test, (x_test.shape[0],  x_test.shape[1], num_features))
    
    return x_train, y_train, x_test, y_test

In [8]:
def establish_model(d=0.1, w=30):
    model = Sequential()
    model.add(LSTM(256, input_shape=(w, 5)))
    model.add(Dropout(d))
    model.add(Dense(1, kernel_initializer="uniform", activation='linear'))
    model.compile(loss='mse', optimizer='adam')
    return model

In [9]:
import matplotlib.pyplot as plt

def peek(y, y_hat):
    plt.plot(y_hat, color='red', label='y_hat - predicted value')
    plt.plot(y, color='blue', label='y - real value')
    plt.legend(loc='best')
    plt.show()

def train(org_df, win_size, sc, org_symbol):
    X, y, X_t, y_t = encapsulate(org_df, win_size)
    model = establish_model()
    rdd = to_simple_rdd(sc, X, y)
    print("\n{0}: RDD wrapping across the training dataset and labels: OK!\n".format(org_symbol))
    print("\n{0}: Model training is ready to ignite!\n".format(org_symbol))
    spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
    print("\n{0}: Conversion from Keras-based LSTM into a Spark model: OK!\n".format(org_symbol))
    spark_model.fit(rdd, epochs=60, batch_size=120, verbose=1, validation_split=0.1)
    print("\n{0}: Status for fitting the model: Successful!\nTesting the model:\n".format(org_symbol))
    pred = spark_model.predict(X_t)
    y_hat = [i[0] for i in pred]
    peek(np.array(y_t), np.array(y_hat))
    return spark_model

In [10]:
# Program Phase Numero 2: This cell only concerns prediction-related methods
def retrieve(company):
    qry_string = "https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&datatype=json&symbol=" + company + "&apikey=IKLAQKSR0NS6E9L8"
    response = requests.get(qry_string)
    content = response.json()
    df = None
    status_success = False
    if list(content.keys())[0] != "Error Message":
        status_success = True
        df = pd.DataFrame(content.get("Time Series (Daily)"))
    return status_success, df

def predict(company):
    # API call
    status, df = retrieve(company)
    if status == False:
        return

    # Data preprocessing
    df = df.transpose()
    df.columns = ["open", "high", "low", "target", "volume"]
    df = df.iloc[0:31]
    df.sort_index(inplace=True)
    df = df[["open", "low", "high", "volume", "target"]]
    df = normalize_data(df, company, pred=True)
    
    up_to_yday = df.loc[0:29, :]
    up_to_tday = df.loc[1:30, :]
    
    windows = np.array([up_to_yday.to_numpy(), up_to_tday.to_numpy()])
    windows = windows.reshape(2, 30, 5)
    
    # Loading the model
    model = load_model("/dbfs/fit_models/" + company)
    pred = model.predict(windows)
    
    # if the trend shows an increase, then return True
    if pred[1][0] - pred[0][0]:
        return True
    
    return False

In [11]:
def main(phase_option, firm_list):
    
    if phase_option == 'train':
        
        WIN_SIZE = 30
        df = read_data(spark, firm_list)
        for symbol in firm_list:
            org_df = df.loc[df.symbol == symbol]
            org_df = truncate_col(org_df)
            org_df = normalize_data(org_df, symbol)
            model = train(org_df, WIN_SIZE, spark.sparkContext, symbol)
            model.save("/dbfs/fit_models/" + symbol)
            print("{0}: The trained model has been saved!".format(symbol))
    
    elif phase_option == 'predict':
        
        for symbol in firm_list:
            if predict(symbol):
                print("Company: ", symbol, " - BUY!")
            else:
                print("Company:", symbol, " - SELL!")

In [12]:
# PROGRAM PHASE 1: CONSTRUCTION AND TUNING OF THE MODELS
main(phase_option = 'train', firm_list = ['AGN', 'RIG', 'MU'])

In [13]:
# PROGRAM PHASE 2: PRACTICAL APPLICATION AND UTILIZATION OF THE MODELS
main(phase_option='predict', firm_list=['AGN', 'RIG', 'MU'])

In [14]:
%fs ls dbfs:/fit_models

path,name,size
dbfs:/fit_models/AGN,AGN,1095104
dbfs:/fit_models/MU,MU,1095104
dbfs:/fit_models/RIG,RIG,1095104
