# Loading Data

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

Configuring spark context to enable aws services

In [2]:
conf = (SparkConf().set("spark.executor.extraJavaOptions",
                       "-Dcom.amazonaws.services.s3.enableV4=true"
                       ).set("spark.driver.extraJavaOptions",
                            "-Dcom.amazonaws.services.s3.enableV4=true"
                            )
       )

In [5]:
sc = SparkContext(conf=conf)
sc.setSystemProperty("com.amazonaws.services.s3.enableV4","true")

Loading aws credentials

In [6]:
with open('FileToS3/aws_credentials') as f:
    lines = f.read().splitlines()

Setting hadoop configuration with spark credentials

In [7]:
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.access.key", lines[0])
hadoopConf.set("fs.s3a.secret.key", lines[1])
hadoopConf.set("fs.s3a.endpoint", "s3-ap-south-1.amazonaws.com")
hadoopConf.set("fs.s3a.path.style.access", "true")
hadoopConf.set("com.amazonaws.services.s3a.enableV4", "true")
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

In [8]:
sparkSession = SparkSession(sc)

Loading file from aws s3

In [9]:
stock_history = sparkSession.read.csv("s3a://historicalstockpricesbucket/GoogleHistoricalQuotes.csv", header='true')

In [10]:
stock_history.take(3)

[Row(Date='05/11/2020', Close/Last='1403.26', Volume='1412116', Open='1378.28', High='1416.53', Low='1377.152'),
 Row(Date='05/08/2020', Close/Last='1388.37', Volume='1388068', Open='1383.13', High='1398.76', Low='1375.48'),
 Row(Date='05/07/2020', Close/Last='1372.56', Volume='1399759', Open='1365.94', High='1377.6', Low='1355.27')]

In [11]:
stock_history.cache()

DataFrame[Date: string, Close/Last: string, Volume: string, Open: string, High: string, Low: string]

Converting column types

In [12]:
stock_history = stock_history.withColumn("Date", stock_history["Date"].cast("string")
                                        ).withColumn("Close/Last", stock_history["Close/Last"].cast("double")
                                        ).withColumn("Volume", stock_history["Volume"].cast("double")
                                        ).withColumn("Open", stock_history["Open"].cast("double")
                                        ).withColumn("High", stock_history["High"].cast("double")
                                        ).withColumn("Low", stock_history["Low"].cast("double"))

In [13]:
stock_history.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Close/Last: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)



In [14]:
stock_history.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Date,1542,,,01/02/2015,12/31/2019
Close/Last,1542,892.6793313229597,265.5001569983217,492.55,1526.69
Volume,1542,1750053.9889753566,878982.4552627216,7932.0,1.11535E7
Open,1542,892.3791491569398,264.8867229772171,494.65,1525.07
High,1542,900.3975564202326,267.9636915346692,495.976,1532.1063
Low,1542,884.2809206225694,262.58149893525564,487.56,1521.4


In [15]:
required_dataframe = stock_history.select(['Open', 'High', 'Low', 'Close/Last'])

In [16]:
required_dataframe.show()

+-------+---------+--------+----------+
|   Open|     High|     Low|Close/Last|
+-------+---------+--------+----------+
|1378.28|  1416.53|1377.152|   1403.26|
|1383.13|  1398.76| 1375.48|   1388.37|
|1365.94|   1377.6| 1355.27|   1372.56|
|1361.69|1371.1199| 1347.29|    1347.3|
|1337.92|  1373.94| 1337.46|   1351.11|
|1308.23|  1327.66|  1299.0|    1326.8|
| 1328.5|1352.0695|  1311.0|   1320.61|
|1324.88|  1352.82| 1322.49|   1348.66|
|1341.46|  1359.99| 1325.34|   1341.48|
|1287.93|  1288.05|  1232.2|   1233.67|
| 1296.0|  1296.15|  1269.0|   1275.88|
|1261.17|   1280.4| 1249.45|   1279.31|
|1271.55|  1293.31| 1265.67|   1276.31|
|1245.54|1285.6133|  1242.0|   1263.21|
| 1247.0|  1254.27| 1209.71|   1216.34|
| 1271.0|   1281.6| 1261.37|   1266.61|
|1284.85|  1294.43| 1271.23|   1283.25|
| 1274.1|   1279.0| 1242.62|   1263.47|
|1245.61|  1280.46|  1240.4|   1262.47|
|1245.09|  1282.07| 1236.93|   1269.23|
+-------+---------+--------+----------+
only showing top 20 rows



# Transformation

In [17]:
from pyspark.ml.feature import VectorAssembler

Vectorizing feature columns into features column

In [18]:
vectorAssembler = VectorAssembler(inputCols=['Open', 'High', 'Low'], outputCol='features')

In [19]:
vectored_dataframe = vectorAssembler.transform(required_dataframe)
vectored_dataframe = vectored_dataframe.select(['features', 'Close/Last'])
vectored_dataframe.show()

+--------------------+----------+
|            features|Close/Last|
+--------------------+----------+
|[1378.28,1416.53,...|   1403.26|
|[1383.13,1398.76,...|   1388.37|
|[1365.94,1377.6,1...|   1372.56|
|[1361.69,1371.119...|    1347.3|
|[1337.92,1373.94,...|   1351.11|
|[1308.23,1327.66,...|    1326.8|
|[1328.5,1352.0695...|   1320.61|
|[1324.88,1352.82,...|   1348.66|
|[1341.46,1359.99,...|   1341.48|
|[1287.93,1288.05,...|   1233.67|
|[1296.0,1296.15,1...|   1275.88|
|[1261.17,1280.4,1...|   1279.31|
|[1271.55,1293.31,...|   1276.31|
|[1245.54,1285.613...|   1263.21|
|[1247.0,1254.27,1...|   1216.34|
|[1271.0,1281.6,12...|   1266.61|
|[1284.85,1294.43,...|   1283.25|
|[1274.1,1279.0,12...|   1263.47|
|[1245.61,1280.46,...|   1262.47|
|[1245.09,1282.07,...|   1269.23|
+--------------------+----------+
only showing top 20 rows



# Building model

In [20]:
from pyspark.ml.regression import LinearRegression

In [21]:
regressor = LinearRegression(featuresCol = 'features', labelCol='Close/Last',
                             maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [22]:
model = regressor.fit(vectored_dataframe)

# Testing

In [23]:
splits = vectored_dataframe.randomSplit([0.7, 0.3])
test_df = splits[1]

In [24]:
predictions = model.transform(test_df)

In [25]:
predictions.select("prediction",'Close/Last','features').show()

+------------------+----------+--------------------+
|        prediction|Close/Last|            features|
+------------------+----------+--------------------+
| 498.5295543328183|    500.87|[494.65,503.23,49...|
| 498.0918183915951|    496.18|[498.84,502.98,49...|
| 500.2719133588521|    496.17|[504.76,504.92,49...|
| 512.3496653709844|    518.73|[510.75,519.9,504.2]|
|501.14509816805764|    495.39|[511.56,513.05,48...|
| 523.2456857063205|     527.7|[516.9,529.46,516...|
|510.99735692010915|    517.15|[517.18,518.6,502.8]|
| 524.0574061498271|    528.86|[519.7,529.78,517...|
| 528.2968952484052|    534.39|[521.48,536.33,51...|
| 520.4976672967623|    516.18|[522.51,524.7,515...|
| 518.6106905403567|     513.8|[522.74,523.1,513...|
| 527.9918045241925|     530.7|[523.99,533.46,52...|
| 522.1005799184527|    519.98|[525.7,525.87,517...|
| 530.8855585973517|    530.59|[527.0,534.56,526...|
| 527.8777795272779|    526.98|[527.13,531.0,523...|
|  519.911661200769|    511.17|[527.25,530.98,

# Evaluation

In [26]:
from pyspark.ml.evaluation import RegressionEvaluator

In [27]:
model_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='Close/Last', metricName='r2')

In [28]:
print('R Squared on test data = %g' % model_evaluator.evaluate(predictions))

R Squared on test data = 0.999227


# Saving Model

In [29]:
from pyspark.ml.regression import LinearRegressionModel

In [30]:
model.save('model')