In [None]:
'''
@Author: Ayur Ninawe
@Date: 06-09-2021
@Last Modified by: Ayur Ninawe
@Last Modified time: 06-09-2021
@Title : Program to clean data and to create ML model for stock price prediction.
'''

In [2]:
import pandas as pd
import numpy as np

In [21]:
'''Getting data from S3 bucket and saving it to dataframe.'''

path ='s3://ayurnaws/google_stock_data.csv'
df = pd.read_csv(path)
df.head()

Unnamed: 0,date,1. open,2. high,3. low,4. close,5. volume
0,2021-09-03 16:42:00,2874.5,2874.5,2874.5,2874.5,286.0
1,2021-09-03 16:31:00,2875.0,2875.0,2875.0,2875.0,103.0
2,2021-09-03 16:28:00,2874.79,2874.79,2874.79,2874.79,613.0
3,2021-09-03 16:15:00,2875.0,2875.0,2874.5,2874.5,279.0
4,2021-09-03 16:11:00,2874.79,2874.79,2874.79,2874.79,203.0


In [22]:
'''
Using Pandas to read data from S3 and return clean dataframe
'''
df = pd.read_csv(path)

        # Renaming columns
df = df.rename(columns={df.columns[1]: 'Open'})
df = df.rename(columns={df.columns[3]: 'Low'})
df = df.rename(columns={df.columns[4]: 'Close'})
df = df.rename(columns={df.columns[5]: 'Volume'})
df = df.rename(columns={df.columns[2]: 'High'})

df.dtypes
df.isnull().sum()

date      0
Open      0
High      0
Low       0
Close     0
Volume    0
dtype: int64

In [None]:
# '''convert string Date into Datetime format '''

# df['date'] = pd.to_datetime(df.date)
# df['date'] = df ["date"].dt.strftime('%m/%d/%y')
# df['date']

In [23]:
df.head()

Unnamed: 0,date,Open,High,Low,Close,Volume
0,2021-09-03 16:42:00,2874.5,2874.5,2874.5,2874.5,286.0
1,2021-09-03 16:31:00,2875.0,2875.0,2875.0,2875.0,103.0
2,2021-09-03 16:28:00,2874.79,2874.79,2874.79,2874.79,613.0
3,2021-09-03 16:15:00,2875.0,2875.0,2874.5,2874.5,279.0
4,2021-09-03 16:11:00,2874.79,2874.79,2874.79,2874.79,203.0


In [25]:
'''Here we start Data Modeling by pyspark our clean data available on 'df'''

'''First import all important libraries of pyspark for modeling process'''

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

'''Convert pandas Dataframe to pyspark Dataframe'''
# sc = SparkContext()
# sparkSession = SparkSession(sc)
stock_price_data = sparkSession.createDataFrame(df)

In [12]:
stock_price_data.printSchema()

root
 |-- date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Date: string (nullable = true)



In [26]:
stock_price_data.describe().toPandas().transpose()



Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
date,3743,,,2021-08-23 04:16:00,2021-09-03 16:42:00
Open,3743,2864.4487212396475,35.45578737179685,2753.14,2924.93
High,3743,2864.952777958855,35.41839308107878,2754.7499,2925.075
Low,3743,2863.9210248730965,35.490252196114234,2752.15,2924.48
Close,3743,2864.440174565856,35.44219184633815,2752.15,2925.075
Volume,3743,2751.7493988779056,4020.0574534657085,100.0,127004.0


In [27]:
#Importing linear regression from pyspark mllib
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler


In [28]:
spark = SparkSession.builder.appName("Stock processing").getOrCreate()

In [29]:
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window

In [34]:
'''TRANSFORMATION'''

'''vectorizing the columns assembler combine all  features before training or scoring the model'''
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

featureassembler=VectorAssembler(inputCols=["Open","High","Low"],outputCol="Independent_Features")

output=featureassembler.transform(stock_price_data)
output.show()
'''see the vectorized feature'''
output.select("Independent_Features").show()
output.columns
'''get the sorted column'''
finalized_data=output.select("date","Independent_Features","Close").sort("date",ascending=True)
finalized_data.show()


'''Divide the data for Training and Testing'''

# finalized_data = finalized_data.toPandas()

# # spliting the dataset in ratio 8:2 
# # train_data, test_data = finalized_data.split([0.80,0.20])
# test_data, train_data = np.split(finalized_data, [int(.2*len(finalized_data))])

# test_data.to_parquet("test_data.parquet")
# # print(test_data.shape[0], train_data.shape[0])

# train_data = spark.createDataFrame(train_data)
# test_data = spark.createDataFrame(test_data)
# test_data.write.parquet('test_data')
# type(test_data)

# spliting the dataset in ratio 8:2 
final_data = finalized_data.withColumn("rank",percent_rank().over(Window.partitionBy().orderBy("date")))

train_data = final_data.where("rank <= .8").drop("rank")
test_data = final_data.where("rank > .8").drop("rank")




+-------------------+--------+-------+---------+--------+-------+--------------------+
|               date|    Open|   High|      Low|   Close| Volume|Independent_Features|
+-------------------+--------+-------+---------+--------+-------+--------------------+
|2021-09-03 16:42:00|  2874.5| 2874.5|   2874.5|  2874.5|  286.0|[2874.5,2874.5,28...|
|2021-09-03 16:31:00|  2875.0| 2875.0|   2875.0|  2875.0|  103.0|[2875.0,2875.0,28...|
|2021-09-03 16:28:00| 2874.79|2874.79|  2874.79| 2874.79|  613.0|[2874.79,2874.79,...|
|2021-09-03 16:15:00|  2875.0| 2875.0|   2874.5|  2874.5|  279.0|[2875.0,2875.0,28...|
|2021-09-03 16:11:00| 2874.79|2874.79|  2874.79| 2874.79|  203.0|[2874.79,2874.79,...|
|2021-09-03 16:10:00|  2874.5| 2874.5|   2874.5|  2874.5|  121.0|[2874.5,2874.5,28...|
|2021-09-03 16:05:00|  2875.0| 2875.0|   2875.0|  2875.0|  135.0|[2875.0,2875.0,28...|
|2021-09-03 16:03:00| 2874.79|2874.79|  2874.79| 2874.79| 1471.0|[2874.79,2874.79,...|
|2021-09-03 16:01:00| 2874.79|2874.79|  287

In [35]:
test_data.write.parquet("test_data")

2021-09-08 23:39:56,895 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [37]:

'''BUILDING MODEL'''
'''USe linear regression alogorithm for model fiting'''

from pyspark.ml.regression import LinearRegression
regressor=LinearRegression(featuresCol='Independent_Features', labelCol='Close')
regressor=regressor.fit(train_data)

lr = LinearRegression(featuresCol = 'Independent_Features', labelCol='Close', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

'''TESTING'''
'''testing the data get the accuracy by using root mean square '''

lr_predictions = lr_model.transform(test_data)
lr_predictions.select("Close","Independent_Features","prediction").show(5)

'''EVALUATION'''

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Close",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

'''Saving Model'''

from pyspark.ml.regression import LinearRegressionModel
lr_model.write().overwrite().save("stock_Model")
print("saved succesfully")

2021-09-08 23:47:24,918 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2021-09-08 23:47:26,349 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2021-09-08 23:47:26,713 WARN util.Instrumentation: [ad67ffca] regParam is zero, which might cause numerical instability and overfitting.
2021-09-08 23:47:29,362 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2021-09-08 23:47:29,363 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
2021-09-08 23:47:29,479 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
2021-09-08 23:47:29,480 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
2021-09-08 23:47:29,731 W

Coefficients: [0.27007788492827467,0.3926965969468253,0.33033448832919166]
Intercept: 19.694201390375785


2021-09-08 23:47:36,346 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------+--------------------+------------------+
| Close|Independent_Features|        prediction|
+------+--------------------+------------------+
|2904.0|[2904.0,2904.0,29...|2903.6826508636386|
|2904.0|[2904.0,2904.0,29...|2903.6826508636386|
|2904.3|[2904.3,2904.3,29...|   2903.9805835547|
|2905.5|[2905.5,2905.5,29...| 2905.172314318945|
|2904.0|[2904.0,2904.0,29...|2903.6826508636386|
+------+--------------------+------------------+
only showing top 5 rows



2021-09-08 23:47:38,433 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


R Squared (R2) on test data = 0.987752




saved succesfully


## Loading model and testing on test data

In [38]:
mlModel = LinearRegressionModel.load("/user/ayur/stock_Model")



In [39]:
test_op = mlModel.transform(test_data)
test_op.show()

2021-09-08 23:47:59,358 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------------------+--------------------+---------+------------------+
|               date|Independent_Features|    Close|        prediction|
+-------------------+--------------------+---------+------------------+
|2021-09-02 08:17:00|[2904.0,2904.0,29...|   2904.0|2903.6826508636386|
|2021-09-02 08:43:00|[2904.0,2904.0,29...|   2904.0|2903.6826508636386|
|2021-09-02 08:56:00|[2904.3,2904.3,29...|   2904.3|   2903.9805835547|
|2021-09-02 09:13:00|[2905.5,2905.5,29...|   2905.5| 2905.172314318945|
|2021-09-02 09:14:00|[2904.0,2904.0,29...|   2904.0|2903.6826508636386|
|2021-09-02 09:26:00|[2904.0,2904.0,29...|   2904.0|2903.6826508636386|
|2021-09-02 09:30:00|[2902.0,2906.33,2...|  2906.33|2903.3968091880097|
|2021-09-02 09:31:00|[2904.32,2907.62,...|  2907.62| 2902.412524420915|
|2021-09-02 09:32:00|[2908.835,2910.37...|  2910.37| 2908.409110473594|
|2021-09-02 09:33:00|[2910.385,2910.38...|  2905.24|2908.3240806959393|
|2021-09-02 09:34:00|[2905.055,2906.06...|  2902.49|2904.2796964

