In [12]:
import boto3
import pandas as pd

client=boto3.client('s3')

In [13]:
with open('aws.txt')as f:
    lines=f.read().splitlines()

In [14]:
from boto.s3.connection import S3Connection
conn = S3Connection(lines[0],lines[1], host='s3.ap-south-1.amazonaws.com')

In [15]:
path='s3://historicaldata03jun2020/HistoricalQuotes.csv'
data=pd.read_csv(path)

In [16]:
data.head(2)

Unnamed: 0,Date,Close/Last,Volume,Open,High,Low
0,06-01-20,"$1,431.82",1217140,"$1,418.39","$1,437.96","$1,418"
1,05/29/2020,"$1,428.92",1838059,"$1,416.94","$1,432.57","$1,413.35"


In [17]:
data.rename(columns={c:c.strip() for c in data.columns.values.tolist()},inplace=True)

In [18]:
data.rename(columns={"Close/Last":"Close"},inplace=True)
data.columns.tolist()

['Date', 'Close', 'Volume', 'Open', 'High', 'Low']

In [19]:
import numpy as np
data['Close']=data['Close'].str.replace(',','').str.replace('$','').astype(float)
data['Open']=data['Open'].str.replace(',','').str.replace('$','').astype(float)
data['High']=data['High'].str.replace(',','').str.replace('$','').astype(float)
data['Low']=data['Low'].str.replace(',','').str.replace('$','').astype(float)
print(data.head(2))

         Date    Close   Volume     Open     High      Low
0    06-01-20  1431.82  1217140  1418.39  1437.96  1418.00
1  05/29/2020  1428.92  1838059  1416.94  1432.57  1413.35


In [20]:
print(data.dtypes)

Date       object
Close     float64
Volume      int64
Open      float64
High      float64
Low       float64
dtype: object


In [21]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

In [25]:
sc.stop()
sc = SparkContext(master="local", appName="SparkDemo")
spark = SparkSession(sc)
stockData = spark.read.csv('Newstockdata.csv', inferSchema="true",header='true')
print(stockData.cache())

DataFrame[Date: string, Close: double, Volume: int, Open: double, High: double, Low: double]


In [26]:
stockData.take(5)

[Row(Date='06/01/2020', Close=1431.82, Volume=1217140, Open=1418.39, High=1437.96, Low=1418.0),
 Row(Date='05/29/2020', Close=1428.92, Volume=1838059, Open=1416.94, High=1432.57, Low=1413.35),
 Row(Date='05/28/2020', Close=1416.73, Volume=1693976, Open=1396.86, High=1440.84, Low=1396.0),
 Row(Date='05/27/2020', Close=1417.84, Volume=1686142, Open=1417.25, High=1421.74, Low=1391.29),
 Row(Date='05/26/2020', Close=1417.02, Volume=2060643, Open=1437.27, High=1441.0, Low=1412.13)]

In [27]:
stockData=stockData.withColumn('Date',stockData['Date'].cast('string'))
stockData.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Date,1259,,,01/02/2018,12/31/2019
Close,1259,979.4841620333605,231.36446929839673,516.83,1526.69
Volume,1259,1716753.3145353454,860209.0458188738,347518,11153500
Open,1259,978.9586576648138,230.8020233980533,519.5,1525.07
High,1259,988.0056632247807,233.56059290531786,522.73,1532.11
Low,1259,970.1199285146953,228.6832797341399,515.18,1521.4


In [28]:
trainData=stockData.select(['Open','High','Low','Close'])
trainData.show(5)

+-------+-------+-------+-------+
|   Open|   High|    Low|  Close|
+-------+-------+-------+-------+
|1418.39|1437.96| 1418.0|1431.82|
|1416.94|1432.57|1413.35|1428.92|
|1396.86|1440.84| 1396.0|1416.73|
|1417.25|1421.74|1391.29|1417.84|
|1437.27| 1441.0|1412.13|1417.02|
+-------+-------+-------+-------+
only showing top 5 rows



# Transformation

In [29]:
from pyspark.ml.feature import VectorAssembler
VectorAssembler = VectorAssembler(inputCols=['Open','High','Low'],outputCol='features')

In [30]:
VectorDataFrame=VectorAssembler.transform(trainData)
VectorDataFrame=VectorDataFrame.select(['features','Close'])
VectorDataFrame.show()

+--------------------+-------+
|            features|  Close|
+--------------------+-------+
|[1418.39,1437.96,...|1431.82|
|[1416.94,1432.57,...|1428.92|
|[1396.86,1440.84,...|1416.73|
|[1417.25,1421.74,...|1417.84|
|[1437.27,1441.0,1...|1417.02|
|[1396.71,1412.76,...|1410.42|
|[1408.0,1415.49,1...| 1402.8|
|[1389.58,1410.42,...|1406.72|
|[1387.0,1392.0,13...|1373.49|
|[1361.75,1392.33,...|1383.94|
|[1350.0,1374.48,1...|1373.19|
|[1335.02,1357.42,...|1356.13|
|[1377.05,1385.48,...|1349.33|
|[1407.12,1415.0,1...|1375.74|
|[1378.28,1416.53,...|1403.26|
|[1383.13,1398.76,...|1388.37|
|[1365.94,1377.6,1...|1372.56|
|[1361.69,1371.12,...| 1347.3|
|[1337.92,1373.94,...|1351.11|
|[1308.23,1327.66,...| 1326.8|
+--------------------+-------+
only showing top 20 rows



In [31]:
from pyspark.ml.regression import LinearRegression
LinearRegression=LinearRegression(featuresCol='features',labelCol='Close',maxIter=10,regParam=0.3,elasticNetParam=0.8)

In [32]:
model=LinearRegression.fit(VectorDataFrame)

In [33]:
SplitsTraingTesting=VectorDataFrame.randomSplit([0.7,0.3])
testDataframe=SplitsTraingTesting[1]

In [34]:
Prediction=model.transform(testDataframe)

In [35]:
Prediction.select("Prediction","Close","features").show()

+-----------------+------+--------------------+
|       Prediction| Close|            features|
+-----------------+------+--------------------+
|529.5935009738305|530.13|[526.29,532.56,52...|
|537.6348681602104|539.18|[532.93,543.0,531...|
|535.1029912621764|531.69|[537.26,537.76,53...|
|538.9257569883774|540.48|[539.64,541.5,535...|
|540.8538634422493|540.31|[539.91,543.5,537...|
| 556.640460017739| 561.1|[546.76,565.85,54...|
|598.0395589098897|594.97|[597.28,605.0,590...|
|599.8874897626912| 600.7|[600.0,603.47,595...|
|607.4577342838642|614.34|[605.59,614.34,59...|
|615.5448882449194|628.62|[610.35,631.71,59...|
|618.2571231874382|621.35|[613.1,624.16,611...|
|627.8471372802679|627.26| [621.0,634.3,620.5]|
|618.4597109107655|612.72|[621.22,626.52,60...|
|627.1226954013817|618.25|[627.54,635.8,617...|
|629.7475403109542|625.61|[631.38,632.91,62...|
|640.9450346647053|643.78|[634.33,647.86,63...|
|633.9699668839604|629.25|[636.79,640.0,627...|
|654.6183853183434|651.16|[653.21,659.39

In [36]:
from pyspark.ml.evaluation import RegressionEvaluator
modelEvaluator=RegressionEvaluator(predictionCol='Prediction',labelCol='Close',metricName='r2')
print(Prediction)

DataFrame[features: vector, Close: double, prediction: double]


In [37]:
print("coefficients : "+ str(model.coefficients))
print("Intercept  : "+ str(model.intercept))

trainingSummary=model.summary
print("RMSE: %f"%trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

coefficients : [0.0,0.4851480789075352,0.5149517720735619]
Intercept  : 0.5901362575730915
RMSE: 7.319078
r2: 0.998998


In [38]:
import sys
model.save('Model')

Py4JJavaError: An error occurred while calling o244.save.
: java.io.IOException: Path Model already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:702)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:179)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [41]:
import pickle
PickelFile="PickleLinearModel"
with open(PickelFile,'wb') as f:
    pickle.dump(model,f)

Py4JError: An error occurred while calling o132.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

