'''
@Author: Rashmi
@Date: 2021-12-11 19:31
@Last Modified by: Rashmi
@Last Modified time: 2021-12-11  00:30
@Title :Program Aim is to clean and preprocess the data by fetching real time stock data using API key, Spark and then create a dataframe and build a machine learning model. 
'''

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
import csv
import requests
import os
from json import dumps
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("Stock Data Prediction").getOrCreate()

2022-01-05 00:51:11,277 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from dotenv import load_dotenv
load_dotenv('.env')
key = os.getenv('API_KEY')

In [3]:
CSV_URL = 'https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY_EXTENDED&symbol=IBM&interval=15min&slice=year1month1&apikey= key'

The api key data is stored in csv file and that file sent to hdfs through terminal

In [4]:
with requests.Session() as s:
    download = s.get(CSV_URL)
    decoded_content = download.content.decode('utf-8')
    cr = csv.reader(decoded_content.splitlines(), delimiter=',')
    my_list = list(cr)
with open('data.csv', 'w') as csv_file: 
    csv_writer = csv.writer(csv_file, delimiter=",")
    for row in my_list:
        csv_writer.writerow(row)
    csv_file.close()

Fetch the stock data csv file from hdfs and stored it as dataframe

In [5]:
df = spark.read.csv("hdfs://localhost:9000/project-stockdata/data.csv",header=True)
df.head()



Row(time='2021-12-10 20:00:00', open='124.02', high='124.02', low='124.02', close='124.02', volume='151')

In [6]:
df.describe()



DataFrame[summary: string, time: string, open: string, high: string, low: string, close: string, volume: string]

In [7]:
df.show(4)

+-------------------+------+------+------+------+------+
|               time|  open|  high|   low| close|volume|
+-------------------+------+------+------+------+------+
|2021-12-10 20:00:00|124.02|124.02|124.02|124.02|   151|
|2021-12-10 19:45:00|124.24|124.24|124.24|124.24|   279|
|2021-12-10 19:15:00|123.85|123.85|123.85|123.85|   499|
|2021-12-10 19:00:00|123.94|123.94|123.94|123.94|   117|
+-------------------+------+------+------+------+------+
only showing top 4 rows



Casting the string values to double to prepare train and test data

In [8]:
df2 = df\
    .withColumn("open",col("Open").cast("double"))\
    .withColumn("high",col("High").cast("double"))\
    .withColumn("low",col("Low").cast("double"))\
    .withColumn("close",col("Close").cast("double"))\
    .withColumn("volume",col("Volume").cast("double"))


In [9]:
df2.head()

Row(time='2021-12-10 20:00:00', open=124.02, high=124.02, low=124.02, close=124.02, volume=151.0)



Creating the vectors from features. Apache MLib takes the input in the form of vectors



In [10]:
featureassembler=VectorAssembler(inputCols=["open","high","low"],outputCol="features")
output=featureassembler.transform(df2)
output.select("features").show(5,truncate=False)


+----------------------+
|features              |
+----------------------+
|[124.02,124.02,124.02]|
|[124.24,124.24,124.24]|
|[123.85,123.85,123.85]|
|[123.94,123.94,123.94]|
|[124.09,124.09,123.85]|
+----------------------+
only showing top 5 rows



In [11]:
finalized_data=output.select("time","features","close").sort("time",ascending=True)
finalized_data.show(5)


+-------------------+--------------------+------+
|               time|            features| close|
+-------------------+--------------------+------+
|2021-11-11 05:00:00|[120.56,120.56,12...|120.56|
|2021-11-11 05:45:00|[120.93,120.93,12...|120.93|
|2021-11-11 06:15:00|[120.87,120.87,12...|120.85|
|2021-11-11 06:45:00|[120.78,120.78,12...|120.78|
|2021-11-11 07:15:00|[120.64,120.65,12...|120.65|
+-------------------+--------------------+------+
only showing top 5 rows



Splitting data into training and testing data and Final data for real time stock data created

In [12]:
final_data = finalized_data.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("time")))

In [13]:
final_data.count()

948

In [14]:
'''storing 80% of actual data into train dataframe'''
train_df = final_data.where("rank <= .8").drop("rank")
train_df.show(10)

2022-01-05 00:52:46,210 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------------------+--------------------+------+
|               time|            features| close|
+-------------------+--------------------+------+
|2021-11-11 05:00:00|[120.56,120.56,12...|120.56|
|2021-11-11 05:45:00|[120.93,120.93,12...|120.93|
|2021-11-11 06:15:00|[120.87,120.87,12...|120.85|
|2021-11-11 06:45:00|[120.78,120.78,12...|120.78|
|2021-11-11 07:15:00|[120.64,120.65,12...|120.65|
|2021-11-11 07:30:00| [120.7,120.7,120.7]| 120.7|
|2021-11-11 07:45:00|[120.65,120.87,12...|120.79|
|2021-11-11 08:15:00|[120.75,120.75,12...| 120.7|
|2021-11-11 08:30:00|[120.64,120.78,12...|120.78|
|2021-11-11 08:45:00|[120.64,120.64,12...| 120.4|
+-------------------+--------------------+------+
only showing top 10 rows





In [15]:
train_df.count()

2022-01-05 00:52:53,812 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


758

In [16]:
test_df = final_data.where("rank > .8").drop("rank")
test_df.show(10)

2022-01-05 00:52:57,701 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------------------+--------------------+------+
|               time|            features| close|
+-------------------+--------------------+------+
|2021-12-06 15:45:00|[120.085,120.18,1...|120.04|
|2021-12-06 16:00:00|[120.05,120.17,11...|119.95|
|2021-12-06 16:15:00|[119.91,120.05,11...|120.05|
|2021-12-06 16:30:00| [119.9,119.9,119.9]| 119.9|
|2021-12-06 16:45:00|[119.95,120.0,119...| 120.0|
|2021-12-06 17:00:00|[120.0,120.0,119.77]|119.77|
|2021-12-06 17:45:00|[119.84,119.91,11...|119.84|
|2021-12-06 18:00:00|[120.05,120.05,12...|120.05|
|2021-12-06 18:30:00| [120.0,120.0,120.0]| 120.0|
|2021-12-06 19:00:00|[119.99,119.99,11...|119.99|
+-------------------+--------------------+------+
only showing top 10 rows





In [17]:
test_df.count()

2022-01-05 00:53:03,904 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


190

In [24]:
'''stores in hdfs at /user/lenovo/testdata'''
test_df.write.parquet('testdata')

2022-01-05 00:56:02,304 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Building a ML Model (Linear regression)

In [20]:
linear_reg=LinearRegression(featuresCol='features',labelCol='close')
lr_model=linear_reg.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

2022-01-05 00:54:10,768 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-01-05 00:54:11,257 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2022-01-05 00:54:11,483 WARN util.Instrumentation: [ec343b84] regParam is zero, which might cause numerical instability and overfitting.
2022-01-05 00:54:13,148 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2022-01-05 00:54:13,151 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
2022-01-05 00:54:13,225 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
2022-01-05 00:54:13,226 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
2022-01-05 00:54:13,404 W

Coefficients: [-0.6116892987793721,0.7903863219989552,0.8207568446262014]
Intercept: 0.0689714373998241




In [21]:
predDF = lr_model.transform(test_df)
predDF.select("features", "close","prediction").show(5)


2022-01-05 00:54:20,173 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------------------+------+------------------+
|            features| close|        prediction|
+--------------------+------+------------------+
|[120.085,120.18,1...|120.04|120.05267368422624|
|[120.05,120.17,11...|119.95|119.96768812510837|
|[119.91,120.05,11...|120.05|120.02413881586769|
| [119.9,119.9,119.9]| 119.9|119.90349019210937|
|[119.95,120.0,119...| 120.0|119.99298220160162|
+--------------------+------+------------------+
only showing top 5 rows





Evaluating the model using Regression Evaluator

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol="prediction",
labelCol="close",
metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"Root Mean Squared Error(RMSE) is {rmse:.1f}")


2022-01-05 00:54:28,043 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Root Mean Squared Error(RMSE) is 0.1






Save the model to hdfs

In [25]:
lr_model.save("hdfs://localhost:9000/prediction_stock_price_data_model")


In [26]:
# test_df.write.parquet('testdata')

2022-01-05 01:00:06,869 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
