"""
@Author: Rikesh Chhetri

@Date: 2021-09-08

@Last Modified by: Rikesh Chhetri

@Last Modified time: 2021-09-08 10:03:30

@Title : Program Aim Cleaning and Processing pf data recieved from kafka and creating dataframe and also creating machine learning model
 
"""

In [1]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName('Stock Data processing').getOrCreate()

2021-09-14 15:07:20,561 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *

In [3]:
df = spark.read.csv("hdfs://localhost:9000/stocksdatakafka/stock_data.csv",header=True)



# Processing  The Data

In [4]:
dataset2=df.withColumnRenamed('["time"','time')\
    .withColumnRenamed(' "open"','open')\
    .withColumnRenamed(' "high"','high')\
    .withColumnRenamed(' "low"','low')\
    .withColumnRenamed(' "close"','close')\
    .withColumnRenamed(' "volume"]','volume')

In [6]:
dataset2.printSchema()

root
 |-- time: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)



In [7]:
dataset2.show()

+--------------------+-----------+-----------+-----------+-----------+----------+
|                time|       open|       high|        low|      close|    volume|
+--------------------+-----------+-----------+-----------+-----------+----------+
|["2021-09-03 19:0...|   "139.55"|   "139.55"|   "139.55"|   "139.55"|   "2749"]|
|["2021-09-03 17:4...|   "139.65"|   "139.65"|   "139.65"|   "139.65"|    "150"]|
|["2021-09-03 16:1...|   "139.58"|   "139.61"|   "139.55"|   "139.55"|  "31003"]|
|["2021-09-03 16:0...|  "139.765"|  "139.885"|   "139.54"|   "139.62"| "269779"]|
|["2021-09-03 15:4...|   "139.69"|  "139.769"|  "139.635"|  "139.769"|  "79292"]|
|["2021-09-03 15:3...| "139.8399"|   "139.86"|  "139.655"|   "139.69"|  "49114"]|
|["2021-09-03 15:1...|   "139.76"|   "139.88"|   "139.75"|   "139.83"|  "50153"]|
|["2021-09-03 15:0...|   "139.78"|   "139.84"|   "139.72"|   "139.74"|  "38715"]|
|["2021-09-03 14:4...|   "139.61"|   "139.78"|   "139.59"|   "139.78"|  "31959"]|
|["2021-09-03 14

# Cleaning Data

In [5]:
newDf = dataset2.withColumn('open', regexp_replace('open', '"', ''))\
    .withColumn('Time', regexp_replace('time', '\\["', ''))\
    .withColumn('Time', regexp_replace('time', '"', ''))\
    .withColumn('High', regexp_replace('high', '"', ''))\
    .withColumn('Low', regexp_replace('low', '"', ''))\
    .withColumn('Close', regexp_replace('close', '"', ''))\
    .withColumn('Volume', regexp_replace('volume', '"', ''))\
    .withColumn('Volume', regexp_replace('volume', '\\]', ''))\
   

# casting string to double values

In [6]:
df2 = newDf\
    .withColumn("open",col("Open").cast("double"))\
    .withColumn("high",col("High").cast("double"))\
    .withColumn("low",col("Low").cast("double"))\
    .withColumn("close",col("Close").cast("double"))\
    .withColumn("volume",col("Volume").cast("double"))

In [10]:
df2.show()

+-------------------+--------+--------+--------+--------+--------+
|               Time|    open|    high|     low|   close|  volume|
+-------------------+--------+--------+--------+--------+--------+
|2021-09-03 19:00:00|  139.55|  139.55|  139.55|  139.55|  2749.0|
|2021-09-03 17:45:00|  139.65|  139.65|  139.65|  139.65|   150.0|
|2021-09-03 16:15:00|  139.58|  139.61|  139.55|  139.55| 31003.0|
|2021-09-03 16:00:00| 139.765| 139.885|  139.54|  139.62|269779.0|
|2021-09-03 15:45:00|  139.69| 139.769| 139.635| 139.769| 79292.0|
|2021-09-03 15:30:00|139.8399|  139.86| 139.655|  139.69| 49114.0|
|2021-09-03 15:15:00|  139.76|  139.88|  139.75|  139.83| 50153.0|
|2021-09-03 15:00:00|  139.78|  139.84|  139.72|  139.74| 38715.0|
|2021-09-03 14:45:00|  139.61|  139.78|  139.59|  139.78| 31959.0|
|2021-09-03 14:30:00|139.6829|139.6861|  139.57|139.6311| 31552.0|
|2021-09-03 14:15:00|  139.63|  139.72|  139.57|  139.69| 34371.0|
|2021-09-03 14:00:00|  139.76|  139.76|  139.63|  139.64| 3665

In [None]:
import matplotlib.pyplot as plt
check_na = df2.toPandas()
check_na1 = check_na.set_index("Time")
check_na1.isna().sum()
check_na1["open"].plot(figsize=(16,6))

# creating vectors from features
# Apache MLlib takes input in vector form

In [7]:
featureassembler=VectorAssembler(inputCols=["open","high","low"],outputCol="features")
output=featureassembler.transform(df2)
output.select("features").show(5,truncate=False)

+------------------------+
|features                |
+------------------------+
|[139.55,139.55,139.55]  |
|[139.65,139.65,139.65]  |
|[139.58,139.61,139.55]  |
|[139.765,139.885,139.54]|
|[139.69,139.769,139.635]|
+------------------------+
only showing top 5 rows





# finalized_data consist of features and label which is close.

In [8]:
finalized_data=output.select("time","features","close").sort("time",ascending=True)

In [13]:
finalized_data.show(5)

+-------------------+--------------------+------+
|               time|            features| close|
+-------------------+--------------------+------+
|2021-08-09 04:15:00|[142.96,143.8,142.5]| 142.5|
|2021-08-09 07:15:00|[142.4,142.63,142.4]|142.55|
|2021-08-09 07:45:00| [142.7,142.8,142.7]| 142.8|
|2021-08-09 08:00:00| [142.7,142.7,142.7]| 142.7|
|2021-08-09 08:15:00|[142.446,143.2499...|142.33|
+-------------------+--------------------+------+
only showing top 5 rows





# splitting data into train and test

In [9]:
final_data = finalized_data.withColumn("rank",percent_rank().over(Window.partitionBy().orderBy("time")))

In [10]:
train_data = final_data.where("rank <= .8").drop("rank")
test_data = final_data.where("rank > .8").drop("rank")

In [16]:
train_data.count()

2021-09-14 09:24:59,290 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


572

In [None]:
type(test_data)
# write df out as table
# test_data.write.parquet("test_data")

# Creating A Model


In [11]:

linear_reg=LinearRegression(featuresCol='features',labelCol='close')
lr_model=linear_reg.fit(train_data)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))


2021-09-14 15:09:09,486 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2021-09-14 15:09:11,628 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2021-09-14 15:09:12,322 WARN util.Instrumentation: [067f226b] regParam is zero, which might cause numerical instability and overfitting.
2021-09-14 15:09:18,396 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2021-09-14 15:09:18,396 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
2021-09-14 15:09:18,806 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
2021-09-14 15:09:18,806 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
2021-09-14 15:09:19,590 W

Coefficients: [-0.44794213421396184,0.68232785821147,0.763631185921753]
Intercept: 0.28480959021561214




# Using our Linear Regression model to make some predictions: 

In [12]:
predDF = lr_model.transform(test_data)
predDF.select("features", "close","prediction").show(5)

2021-09-14 15:09:23,335 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------------------+------+------------------+
|            features| close|        prediction|
+--------------------+------+------------------+
|[139.045,139.085,...|138.98| 139.0088189816052|
|[138.97,138.98,13...|138.97|138.98604284027746|
|[139.21,139.21,13...|139.21|  139.218743620076|
|[138.81,138.81,13...|138.81|138.81953685610827|
|[138.81,139.05,13...|139.05|138.98329554207902|
+--------------------+------+------------------+
only showing top 5 rows





# Evaluating Models

## RMSE
### RMSE is a metric that ranges from zero to infinity. The closer it is to zero, the better.

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol="prediction",
labelCol="close",
metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")

2021-09-14 09:26:37,125 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


RMSE is 0.1




# Saving Models

In [None]:
# lr_model.save("stock_data_model")
