In [2]:
from pyspark.sql import SparkSession
import csv
import requests
import os
from json import dumps
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("Stock Data Prediction").getOrCreate()

2021-12-04 23:25:00,151 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-12-04 23:25:06,953 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2021-12-04 23:25:06,955 WARN util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
2021-12-04 23:25:06,956 WARN util.Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [3]:
CSV_URL = 'https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY_EXTENDED&symbol=IBM&interval=1min&slice=year1month1&apikey= 2AOXSCUJQ3NQNKSG'

The api key data is stored in csv file and that file sent to hdfs through terminal

In [3]:
with requests.Session() as s:
    download = s.get(CSV_URL)
    decoded_content = download.content.decode('utf-8')
    cr = csv.reader(decoded_content.splitlines(), delimiter=',')
    my_list = list(cr)
with open('data.csv', 'w') as csv_file: 
    csv_writer = csv.writer(csv_file, delimiter=",")
    for row in my_list:
        csv_writer.writerow(row)
    csv_file.close()

Fetch the stock data csv file from hdfs and stored it as dataframe

In [4]:
df = spark.read.csv("hdfs://localhost:9000/Spark/StockDataPrediction/StockData.csv",header=True)
df.head()



Row(time='2021-12-03 19:40:00', open='119.11', high='119.11', low='119.11', close='119.11', volume='307')

In [3]:
df.describe()



DataFrame[summary: string, time: string, open: string, high: string, low: string, close: string, volume: string]

In [4]:
df.show(4)

+-------------------+------+------+------+------+------+
|               time|  open|  high|   low| close|volume|
+-------------------+------+------+------+------+------+
|2021-12-03 19:40:00|119.11|119.11|119.11|119.11|   307|
|2021-12-03 18:40:00|119.05|119.05|119.05|119.05|   100|
|2021-12-03 18:14:00|119.05|119.05|119.05|119.05|   267|
|2021-12-03 18:09:00|119.06|119.06|119.05|119.05|   310|
+-------------------+------+------+------+------+------+
only showing top 4 rows



Casting the string values to double to prepare train and test data

In [5]:
df2 = df\
    .withColumn("open",col("Open").cast("double"))\
    .withColumn("high",col("High").cast("double"))\
    .withColumn("low",col("Low").cast("double"))\
    .withColumn("close",col("Close").cast("double"))\
    .withColumn("volume",col("Volume").cast("double"))

In [6]:
df2.head()



Row(time='2021-12-03 19:40:00', open=119.11, high=119.11, low=119.11, close=119.11, volume=307.0)

Creating the vectors from features. Apache MLib takes the input in the form of vectors=

In [6]:
featureassembler=VectorAssembler(inputCols=["open","high","low"],outputCol="features")
output=featureassembler.transform(df2)
output.select("features").show(5,truncate=False)



+----------------------+
|features              |
+----------------------+
|[119.11,119.11,119.11]|
|[119.05,119.05,119.05]|
|[119.05,119.05,119.05]|
|[119.06,119.06,119.05]|
|[119.05,119.05,119.05]|
+----------------------+
only showing top 5 rows





In [7]:
finalized_data=output.select("time","features","close").sort("time",ascending=True)
finalized_data.show(5)



+-------------------+--------------------+-------------+
|               time|            features|        close|
+-------------------+--------------------+-------------+
|2021-11-04 04:07:00|[121.895807821,12...|121.353171687|
|2021-11-04 04:08:00|[121.353171687,12...|121.353171687|
|2021-11-04 04:11:00|[121.728083925,12...|121.728083925|
|2021-11-04 04:14:00|[121.550493918,12...|121.550493918|
|2021-11-04 04:24:00|[121.353171687,12...|121.353171687|
+-------------------+--------------------+-------------+
only showing top 5 rows






Splitting data into training and testing data and Final data for real time stock data created

In [8]:
final_data = finalized_data.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("time")))

In [9]:
train_df = final_data.where("rank <= .8").drop("rank")
train_df.show(10)

2021-12-04 23:27:45,924 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------------------+--------------------+-------------+
|               time|            features|        close|
+-------------------+--------------------+-------------+
|2021-11-04 04:07:00|[121.895807821,12...|121.353171687|
|2021-11-04 04:08:00|[121.353171687,12...|121.353171687|
|2021-11-04 04:11:00|[121.728083925,12...|121.728083925|
|2021-11-04 04:14:00|[121.550493918,12...|121.550493918|
|2021-11-04 04:24:00|[121.353171687,12...|121.353171687|
|2021-11-04 04:35:00|[121.343305576,12...|121.343305576|
|2021-11-04 04:36:00|[121.254510572,12...|121.264376684|
|2021-11-04 04:38:00|[121.254510572,12...|121.155849457|
|2021-11-04 04:42:00|[121.057188342,12...|121.057188342|
|2021-11-04 04:44:00|[121.155849457,12...|121.155849457|
+-------------------+--------------------+-------------+
only showing top 10 rows





In [10]:
test_df = final_data.where("rank > .8").drop("rank")
test_df.show(10)

2021-12-04 23:28:03,767 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------------------+--------------------+-------+
|               time|            features|  close|
+-------------------+--------------------+-------+
|2021-11-29 14:42:00|[119.31,119.37,11...| 119.34|
|2021-11-29 14:43:00|[119.37,119.3761,...| 119.33|
|2021-11-29 14:44:00|[119.32,119.3201,...| 119.24|
|2021-11-29 14:45:00|[119.228,119.29,1...|119.275|
|2021-11-29 14:46:00|[119.26,119.3,119...| 119.29|
|2021-11-29 14:47:00|[119.3,119.31,119...|119.295|
|2021-11-29 14:48:00|[119.31,119.31,11...| 119.21|
|2021-11-29 14:49:00|[119.22,119.23,11...| 119.22|
|2021-11-29 14:50:00|[119.23,119.23,11...| 119.15|
|2021-11-29 14:51:00|[119.13,119.2,119...|  119.2|
+-------------------+--------------------+-------+
only showing top 10 rows





In [14]:
train_df.count()

2021-12-04 13:22:39,907 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


7434



In [15]:
test_df.count()

2021-12-04 13:22:46,152 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


1859

In [16]:
type(test_df)

pyspark.sql.dataframe.DataFrame

Save the test data to hdfs as parquet file 

In [13]:

test_df.write.parquet('RealTimeStockDataPrediction.parquet')

2021-12-04 19:15:47,011 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Building a ML Model (Linear regression)

In [11]:
linear_reg=LinearRegression(featuresCol='features',labelCol='close')
lr_model=linear_reg.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

2021-12-04 23:28:19,690 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2021-12-04 23:28:20,568 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2021-12-04 23:28:21,054 WARN util.Instrumentation: [d56eef47] regParam is zero, which might cause numerical instability and overfitting.
2021-12-04 23:28:23,435 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2021-12-04 23:28:23,437 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
2021-12-04 23:28:23,821 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
2021-12-04 23:28:23,822 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
2021-12-04 23:28:24,859 W

Coefficients: [-0.44724541526933487,0.7175480484697344,0.7297495663223875]
Intercept: -0.005938793314908819


In [12]:
predDF = lr_model.transform(test_df)
predDF.select("features", "close","prediction").show(5)

2021-12-04 23:28:35,664 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------------------+-------+------------------+
|            features|  close|        prediction|
+--------------------+-------+------------------+
|[119.31,119.37,11...| 119.34|  119.353342014657|
|[119.37,119.3761,...| 119.33|119.33818182849971|
|[119.32,119.3201,...| 119.24|119.24008895625342|
|[119.228,119.29,1...|119.275|119.26693483386249|
|[119.26,119.3,119...| 119.29|  119.274393452385|
+--------------------+-------+------------------+
only showing top 5 rows



Evaluvating the model using Regression Evaluator

In [13]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol="prediction",
labelCol="close",
metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"Root Mean Squared Error(RMSE) is {rmse:.1f}")

2021-12-04 23:28:49,040 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Root Mean Squared Error(RMSE) is 0.0




Save the model to hdfs 

In [None]:

lr_model.save("hdfs://localhost:9000/prediction_stock_price_data_model")

In [29]:
data_frame = predDF.toDF('time', 'features', 'close', 'prediction')

In [31]:
data_frame.show(5)

2021-12-04 20:17:34,552 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------------------+--------------------+-------+------------------+
|               time|            features|  close|        prediction|
+-------------------+--------------------+-------+------------------+
|2021-11-29 14:42:00|[119.31,119.37,11...| 119.34|  119.353342014657|
|2021-11-29 14:43:00|[119.37,119.3761,...| 119.33|119.33818182849971|
|2021-11-29 14:44:00|[119.32,119.3201,...| 119.24|119.24008895625342|
|2021-11-29 14:45:00|[119.228,119.29,1...|119.275|119.26693483386249|
|2021-11-29 14:46:00|[119.26,119.3,119...| 119.29|  119.274393452385|
+-------------------+--------------------+-------+------------------+
only showing top 5 rows





In [35]:
# type (data_frame)
data_frame.write.parquet('RealTimeStockDataPrediction.parquet')

2021-12-04 20:21:07,163 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
