In [19]:
from __future__ import print_function
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import percent_rank
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
from pyspark.ml.feature import VectorAssembler
import numpy as np
from sagemaker.predictor import csv_serializer 
from pyspark.sql.types import DoubleType
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from math import sqrt

In [20]:
if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("demo")\
        .getOrCreate()

# Reading data from csv

In [21]:
df = spark.read.format("csv").option("header", "true").load("stocks_data.csv")

In [22]:
w = Window.partitionBy().orderBy("timestamp")
n = 1 #Horizon
df = df.withColumn('diffOpenClose',df.open-df.close)
df = df.withColumn('diffHighLow',df.high-df.low)
df = df.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("timestamp")))
df = df.withColumn('target', F.when(F.lag(df.close + n).over(w)<df.close, 1).otherwise(0))

categoricalColumns = ['high','low', 'open','close']

df = df.drop('timestamp')

# Converting string type to double

In [23]:
df = df.withColumn("volume", df["volume"].cast(DoubleType()))
df = df.withColumn("open", df["open"].cast(DoubleType()))
df = df.withColumn("high", df["high"].cast(DoubleType()))
df = df.withColumn("low", df["low"].cast(DoubleType()))
df = df.withColumn("close", df["close"].cast(DoubleType()))

# Log transformation

###### Since the data is skewed, we apply log transformation

In [24]:
d = {}
# Fill in the entries one by one
for col in df.columns[1:-2]:
      d[col] = df.approxQuantile(col,[0.01,0.99],0.25)

for col in d.keys():
    df_new = df.withColumn(col, F.log(F.when(df[col] < d[col][0],d[col][0]).when(df[col] > d[col][1], d[col][1]).otherwise(df[col]) +1).alias(col))
          

# Scaling features using standardization

In [25]:
assembler = VectorAssembler().setInputCols(df_new.columns).setOutputCol("features")
transformed = assembler.transform(df_new)
scaler = MinMaxScaler(inputCol="features",outputCol="scaledfeatures")
scalerModel =  scaler.fit(transformed.select("features"))
scaledData = scalerModel.transform(transformed)

In [26]:
def extract(row):
    return tuple(row.scaledFeatures.toArray().tolist())


In [27]:
final_data = scaledData.select("scaledFeatures").rdd.map(extract).toDF(df_new.columns)

# Re-arranging dataframe to fir the model needs

In [28]:
df = df.select('target',
 'open',
 'high',
 'low',
 'close',
 'volume',
 'diffOpenClose',
 'diffHighLow',
 'rank')

# Splitting dataframe into train and test

In [29]:
trainingData = df.where("rank <= .8").drop("rank")
testData = df.where("rank > .8").drop("rank")

# Exponential Smoothing

In [None]:
def get_exp_preprocessing(df, alpha=0.9):
    edata = df.ewm(alpha=alpha).mean()    
    return edata

# Saving training and testing data to S3 bucket

In [33]:
Xtr = trainingData.toPandas()
Xtr = get_exp_preprocessing(Xtr)
Xtr.to_csv(header=None, path_or_buf = "training3.csv", index=False)
testData = testData.toPandas()
testData = get_exp_preprocessing(testData)
testData.to_csv(header=None,path_or_buf = 'testdata2.csv',index=False)

# Prediction model

In [13]:
import boto3, re, sys, math, json, os, sagemaker, urllib.request



containers = {'us-west-2': '433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest',
              'us-east-1': '811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest',
              'us-east-2': '825641698319.dkr.ecr.us-east-2.amazonaws.com/xgboost:latest',
              'eu-west-1': '685385470294.dkr.ecr.eu-west-1.amazonaws.com/xgboost:latest'} # each region has its XGBoost container


In [14]:
bucket_name = 'bigdataprojnyu'
prefix = 'sagemaker/DEMO-xgboost-dm'
myregion = "us-east-1"
containers = {'us-west-2': '433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest',
              'us-east-1': '811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest',
              'us-east-2': '825641698319.dkr.ecr.us-east-2.amazonaws.com/xgboost:latest',
              'eu-west-1': '685385470294.dkr.ecr.eu-west-1.amazonaws.com/xgboost:latest'} 

In [15]:
role = "arn:aws:iam::566468225241:role/BDA_Sagemaker"

sess = sagemaker.Session()
xgb = sagemaker.estimator.Estimator(containers[myregion],role = role, train_instance_count=1, train_instance_type='ml.m4.xlarge',output_path='s3://bigdataprojnyu/output'.format(bucket_name, prefix),sagemaker_session=sess)
xgb.set_hyperparameters(max_depth=5,eta=0.2,gamma=4,min_child_weight=6,subsample=0.8,silent=0,objective='binary:logistic',num_round=100)


In [16]:
s3_input_train = sagemaker.s3_input(s3_data='s3://{}/train'.format(bucket_name), content_type='csv')

In [42]:
xgboost_model = xgb.fit({'train': s3_input_train})

2019-05-16 05:51:34 Starting - Starting the training job...
2019-05-16 05:51:35 Starting - Launching requested ML instances......
2019-05-16 05:52:47 Starting - Preparing the instances for training......
2019-05-16 05:53:55 Downloading - Downloading input data...
2019-05-16 05:54:30 Training - Training image download completed. Training in progress.
2019-05-16 05:54:30 Uploading - Uploading generated training model.
[31mArguments: train[0m
[31m[2019-05-16:05:54:28:INFO] Running standalone xgboost training.[0m
[31m[2019-05-16:05:54:28:INFO] Path /opt/ml/input/data/validation does not exist![0m
[31m[2019-05-16:05:54:28:INFO] File size need to be processed in the node: 0.3mb. Available memory size in the node: 8425.14mb[0m
[31m[2019-05-16:05:54:28:INFO] Determined delimiter of CSV input is ','[0m
[31m[05:54:28] S3DistributionType set as FullyReplicated[0m
[31m[05:54:28] 4300x7 matrix with 30100 entries loaded from /opt/ml/input/data/train?format=csv&label_column=0&delimiter=,

# Deploying model on sagemaker

In [43]:
xgb_predictor = xgb.deploy(initial_instance_count=1,instance_type='ml.m4.xlarge')

----------------------------------------------------------------------------------------!