## The data file is sourced from quandl. It is already sorted by date descending.

Contains the following attributes

    Date
    Open 
    High
    Low
    Last
    Close
    Total Trade Quantity
    Turnover (Lacs)

### Create Spark context and Load the file from HDFS

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

In [2]:
sc= SparkContext()
sc.setLogLevel('INFO')
spark = SQLContext(sc)

In [177]:
raw_data=spark.read.csv('hdfs://ip-172-31-53-48.ec2.internal:8020/user/anupkma7125/NSE-HINDUNILVR.csv',header=True,inferSchema=True)

In [178]:
raw_data.show(5)

+-------------------+------+-------+------+-------+-------+--------------------+---------------+
|               Date|  Open|   High|   Low|   Last|  Close|Total Trade Quantity|Turnover (Lacs)|
+-------------------+------+-------+------+-------+-------+--------------------+---------------+
|2017-12-26 00:00:00|1350.4| 1353.9|1340.0| 1348.0| 1348.1|            913529.0|       12307.33|
|2017-12-22 00:00:00|1341.3| 1359.5|1341.3| 1350.4| 1356.5|            561400.0|        7604.01|
|2017-12-21 00:00:00|1365.0|1365.15|1345.1|1346.45|1348.45|            557871.0|        7532.59|
|2017-12-20 00:00:00|1346.2|1367.95|1333.5|1367.15|1362.65|           1198943.0|       16226.24|
|2017-12-19 00:00:00|1333.1| 1352.0|1326.5| 1352.0| 1349.7|           1227087.0|       16453.06|
+-------------------+------+-------+------+-------+-------+--------------------+---------------+
only showing top 5 rows



### Using spark SQL window functions create the lag columns for 5 days

In [179]:
from pyspark.sql import Window

In [180]:
from pyspark.sql.functions import lag

In [181]:
windowSpec = Window.orderBy(raw_data['Date'].desc()) 

In [182]:
raw_data_lag=raw_data.withColumn('OpenLag_1',lag(raw_data['Open'],1).over(windowSpec))

In [183]:
raw_data_lag.show(5)

+-------------------+------+-------+------+-------+-------+--------------------+---------------+---------+
|               Date|  Open|   High|   Low|   Last|  Close|Total Trade Quantity|Turnover (Lacs)|OpenLag_1|
+-------------------+------+-------+------+-------+-------+--------------------+---------------+---------+
|2017-12-26 00:00:00|1350.4| 1353.9|1340.0| 1348.0| 1348.1|            913529.0|       12307.33|     null|
|2017-12-22 00:00:00|1341.3| 1359.5|1341.3| 1350.4| 1356.5|            561400.0|        7604.01|   1350.4|
|2017-12-21 00:00:00|1365.0|1365.15|1345.1|1346.45|1348.45|            557871.0|        7532.59|   1341.3|
|2017-12-20 00:00:00|1346.2|1367.95|1333.5|1367.15|1362.65|           1198943.0|       16226.24|   1365.0|
|2017-12-19 00:00:00|1333.1| 1352.0|1326.5| 1352.0| 1349.7|           1227087.0|       16453.06|   1346.2|
+-------------------+------+-------+------+-------+-------+--------------------+---------------+---------+
only showing top 5 rows



In [184]:
column_names =['Open','High','Low','Last','Close','Total Trade Quantity','Turnover (Lacs)']

In [185]:
trans_data=raw_data
window_spec=Window.orderBy(trans_data['Date'].desc()) 
for column in column_names:
    for i in range(5):
        new_column_name=column+"Lag_"+str(i+1)
        value_1=lag(trans_data[column],i-1).over(window_spec)
        value_2=lag(trans_data[column],i-2).over(window_spec)
        diffpercentage = ((value_1 - value_2)/value_2)*100
        trans_data=trans_data.withColumn(new_column_name,diffpercentage)
value_1=lag(trans_data['Close'],-1).over(window_spec)
diffpercentage = ((trans_data['Close'] - value_1)/value_1)*100
trans_data_r=trans_data.withColumn('percent_change',diffpercentage)

In [186]:
trans_data_r.select('Date','Open','OpenLag_1','Close','percent_change').show()

+-------------------+-------+--------------------+-------+--------------------+
|               Date|   Open|           OpenLag_1|  Close|      percent_change|
+-------------------+-------+--------------------+-------+--------------------+
|2017-12-26 00:00:00| 1350.4| -1.7362637362637396| 1348.1| -0.6192406929598298|
|2017-12-22 00:00:00| 1341.3|  1.3965235477640732| 1356.5|  0.5969817197523047|
|2017-12-21 00:00:00| 1365.0|  0.9826719675943393|1348.45| -1.0420871096760023|
|2017-12-20 00:00:00| 1346.2|  0.9924242424242354|1362.65|  0.9594724753648993|
|2017-12-19 00:00:00| 1333.1| -0.6248588421290338| 1349.7|  1.3478505725549124|
|2017-12-18 00:00:00| 1320.0|  1.0882800608827972|1331.75|  0.5435808387754366|
|2017-12-15 00:00:00| 1328.3|-0.40927694406549114|1324.55| 0.22321428571428917|
|2017-12-14 00:00:00| 1314.0|0.022742779167628072| 1321.6|  0.4980799209155511|
|2017-12-13 00:00:00| 1319.4| -0.7449209932279978|1315.05|0.030426349218412776|
|2017-12-12 00:00:00| 1319.1|   2.514655

In [187]:
trans_data_required=trans_data_r.select([c for c in trans_data_r.columns if c not in column_names])

In [188]:
trans_data_required.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- OpenLag_1: double (nullable = true)
 |-- OpenLag_2: double (nullable = true)
 |-- OpenLag_3: double (nullable = true)
 |-- OpenLag_4: double (nullable = true)
 |-- OpenLag_5: double (nullable = true)
 |-- HighLag_1: double (nullable = true)
 |-- HighLag_2: double (nullable = true)
 |-- HighLag_3: double (nullable = true)
 |-- HighLag_4: double (nullable = true)
 |-- HighLag_5: double (nullable = true)
 |-- LowLag_1: double (nullable = true)
 |-- LowLag_2: double (nullable = true)
 |-- LowLag_3: double (nullable = true)
 |-- LowLag_4: double (nullable = true)
 |-- LowLag_5: double (nullable = true)
 |-- LastLag_1: double (nullable = true)
 |-- LastLag_2: double (nullable = true)
 |-- LastLag_3: double (nullable = true)
 |-- LastLag_4: double (nullable = true)
 |-- LastLag_5: double (nullable = true)
 |-- CloseLag_1: double (nullable = true)
 |-- CloseLag_2: double (nullable = true)
 |-- CloseLag_3: double (nullable = true)
 |-- CloseLag_4

### Drop rows with null and create categorical column for trend

In [189]:
trans_data_required=trans_data_required.dropna()

In [190]:
bucketizer2 = Bucketizer(splits=[-100,0,100 ],inputCol="percent_change", outputCol="trend")

In [191]:
df_buck = bucketizer2.setHandleInvalid("keep").transform(trans_data_required)
t = {0.0:"Down", 1.0: "Up"}
udf_foo = udf(lambda x: t[x], StringType())
df_buck=df_buck.withColumn("trend_bucket", udf_foo("trend"))

In [192]:
df_buck.select('Date','percent_change','trend_bucket').show()

+-------------------+--------------------+------------+
|               Date|      percent_change|trend_bucket|
+-------------------+--------------------+------------+
|2017-12-20 00:00:00|  0.9594724753648993|          Up|
|2017-12-19 00:00:00|  1.3478505725549124|          Up|
|2017-12-18 00:00:00|  0.5435808387754366|          Up|
|2017-12-15 00:00:00| 0.22321428571428917|          Up|
|2017-12-14 00:00:00|  0.4980799209155511|          Up|
|2017-12-13 00:00:00|0.030426349218412776|          Up|
|2017-12-12 00:00:00| -0.7099429779842048|        Down|
|2017-12-11 00:00:00| -0.1959823615874674|        Down|
|2017-12-08 00:00:00|   2.685862455977402|          Up|
|2017-12-07 00:00:00|  1.2817497648165677|          Up|
|2017-12-06 00:00:00|  1.1578112609040372|          Up|
|2017-12-05 00:00:00| -0.5990856061800338|        Down|
|2017-12-04 00:00:00|   1.447421031587358|          Up|
|2017-12-01 00:00:00| -1.7250186647805448|        Down|
|2017-11-30 00:00:00| -0.2899345688202833|      

In [193]:
trend_df=df_buck.select([c for c in df_buck.columns if c not in {'trend','percent_change'}])

In [194]:
trend_df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- OpenLag_1: double (nullable = true)
 |-- OpenLag_2: double (nullable = true)
 |-- OpenLag_3: double (nullable = true)
 |-- OpenLag_4: double (nullable = true)
 |-- OpenLag_5: double (nullable = true)
 |-- HighLag_1: double (nullable = true)
 |-- HighLag_2: double (nullable = true)
 |-- HighLag_3: double (nullable = true)
 |-- HighLag_4: double (nullable = true)
 |-- HighLag_5: double (nullable = true)
 |-- LowLag_1: double (nullable = true)
 |-- LowLag_2: double (nullable = true)
 |-- LowLag_3: double (nullable = true)
 |-- LowLag_4: double (nullable = true)
 |-- LowLag_5: double (nullable = true)
 |-- LastLag_1: double (nullable = true)
 |-- LastLag_2: double (nullable = true)
 |-- LastLag_3: double (nullable = true)
 |-- LastLag_4: double (nullable = true)
 |-- LastLag_5: double (nullable = true)
 |-- CloseLag_1: double (nullable = true)
 |-- CloseLag_2: double (nullable = true)
 |-- CloseLag_3: double (nullable = true)
 |-- CloseLag_4

In [195]:
trend_df.persist()

DataFrame[Date: timestamp, OpenLag_1: double, OpenLag_2: double, OpenLag_3: double, OpenLag_4: double, OpenLag_5: double, HighLag_1: double, HighLag_2: double, HighLag_3: double, HighLag_4: double, HighLag_5: double, LowLag_1: double, LowLag_2: double, LowLag_3: double, LowLag_4: double, LowLag_5: double, LastLag_1: double, LastLag_2: double, LastLag_3: double, LastLag_4: double, LastLag_5: double, CloseLag_1: double, CloseLag_2: double, CloseLag_3: double, CloseLag_4: double, CloseLag_5: double, Total Trade QuantityLag_1: double, Total Trade QuantityLag_2: double, Total Trade QuantityLag_3: double, Total Trade QuantityLag_4: double, Total Trade QuantityLag_5: double, Turnover (Lacs)Lag_1: double, Turnover (Lacs)Lag_2: double, Turnover (Lacs)Lag_3: double, Turnover (Lacs)Lag_4: double, Turnover (Lacs)Lag_5: double, trend_bucket: string]

## Train Test split, Scaling, Different Classification models, Score

In [197]:
from pyspark.ml.classification import LogisticRegression

In [198]:
feature_Columns=trend_df.columns
feature_Columns.remove('trend_bucket')
feature_Columns.remove('Date')

In [199]:
feature_Columns

['OpenLag_1',
 'OpenLag_2',
 'OpenLag_3',
 'OpenLag_4',
 'OpenLag_5',
 'HighLag_1',
 'HighLag_2',
 'HighLag_3',
 'HighLag_4',
 'HighLag_5',
 'LowLag_1',
 'LowLag_2',
 'LowLag_3',
 'LowLag_4',
 'LowLag_5',
 'LastLag_1',
 'LastLag_2',
 'LastLag_3',
 'LastLag_4',
 'LastLag_5',
 'CloseLag_1',
 'CloseLag_2',
 'CloseLag_3',
 'CloseLag_4',
 'CloseLag_5',
 'Total Trade QuantityLag_1',
 'Total Trade QuantityLag_2',
 'Total Trade QuantityLag_3',
 'Total Trade QuantityLag_4',
 'Total Trade QuantityLag_5',
 'Turnover (Lacs)Lag_1',
 'Turnover (Lacs)Lag_2',
 'Turnover (Lacs)Lag_3',
 'Turnover (Lacs)Lag_4',
 'Turnover (Lacs)Lag_5']

In [200]:
from pyspark.ml.feature import *

In [201]:
assembler = VectorAssembler().setInputCols(feature_Columns).setOutputCol("features")

In [202]:
labelIndexer = StringIndexer().setInputCol("trend_bucket").setOutputCol("label")

In [203]:
lr = LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)

In [204]:
from pyspark.ml import Pipeline

In [205]:
pipeline = Pipeline(stages=[assembler, labelIndexer, lr])

In [206]:
splitSeed = 5043
trainingData, testData = trend_df.randomSplit([0.8, 0.2], splitSeed)

In [207]:
trainingData.count()

2082

In [208]:
testData.count()

500

In [209]:
model = pipeline.fit(trainingData)

In [251]:
predictions = model.transform(testData)
selected = predictions.select("Date","label","probability", "prediction")
for row in selected.collect():
    Date,label,prob, prediction = row
    print("(%s,%s) --> prob=%s, prediction=%f" % (str(Date),label, str(prob), prediction))

(2007-07-31 00:00:00,1.0) --> prob=[0.443667334667,0.556332665333], prediction=1.000000
(2007-08-27 00:00:00,0.0) --> prob=[0.546079671191,0.453920328809], prediction=0.000000
(2007-10-05 00:00:00,1.0) --> prob=[0.469315817957,0.530684182043], prediction=1.000000
(2007-10-08 00:00:00,1.0) --> prob=[0.470676190589,0.529323809411], prediction=1.000000
(2007-10-09 00:00:00,0.0) --> prob=[0.602736936474,0.397263063526], prediction=0.000000
(2007-10-12 00:00:00,1.0) --> prob=[0.374796689432,0.625203310568], prediction=1.000000
(2007-10-15 00:00:00,1.0) --> prob=[0.464061639186,0.535938360814], prediction=1.000000
(2007-10-30 00:00:00,1.0) --> prob=[0.508933565861,0.491066434139], prediction=0.000000
(2007-10-31 00:00:00,1.0) --> prob=[0.19929668381,0.80070331619], prediction=1.000000
(2007-11-01 00:00:00,1.0) --> prob=[0.17342480387,0.82657519613], prediction=1.000000
(2007-11-05 00:00:00,1.0) --> prob=[0.437324177313,0.562675822687], prediction=1.000000
(2007-12-10 00:00:00,1.0) --> prob=[

In [252]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [253]:
evaluator=BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")

In [254]:
measure=evaluator.evaluate(predictions)

In [255]:
print(measure)

0.997215599046


In [256]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [257]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [10, 15, 20])
             .build())

In [258]:
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [260]:
cvModel = cv.fit(trainingData) ## This will take some time

In [261]:
predictions2 = cvModel.transform(testData)

In [262]:
measure2=evaluator.evaluate(predictions2)

In [263]:
print(measure2)

1.0


### TODO- RandomForest,  Multilayer perceptron classifier