***@Author: Ranjith G C***
<br>
***@Date: 2021-09-12***
<br>
***@Last Modified by: Ranjith G C***
<br>@
***Last Modified time: 2021-09-12***
<br>
***@Title : Program Aim to process and clean stock data and saving into model using mllib and pyspark***

In [3]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName('Stock Data processing').getOrCreate()

In [4]:
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *
import pandas as pd

In [5]:
df = spark.read.csv("hdfs://localhost:9000/kafka_stock_data/data.csv",header=True)

# Processing  The Data

In [6]:
dfs1=df.withColumnRenamed('["time"','time')\
    .withColumnRenamed(' "open"','open')\
    .withColumnRenamed(' "high"','high')\
    .withColumnRenamed(' "low"','low')\
    .withColumnRenamed(' "close"','close')\
    .withColumnRenamed(' "volume"]','volume')

In [7]:
dfs1.printSchema()

root
 |-- time: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)



In [8]:
dfs1.show()

+--------------------+-----------+-----------+-----------+-----------+----------+
|                time|       open|       high|        low|      close|    volume|
+--------------------+-----------+-----------+-----------+-----------+----------+
|["2021-09-03 19:0...|   "139.55"|   "139.55"|   "139.55"|   "139.55"|   "2749"]|
|["2021-09-03 17:4...|   "139.65"|   "139.65"|   "139.65"|   "139.65"|    "150"]|
|["2021-09-03 16:1...|   "139.58"|   "139.61"|   "139.55"|   "139.55"|  "31003"]|
|["2021-09-03 16:0...|  "139.765"|  "139.885"|   "139.54"|   "139.62"| "269779"]|
|["2021-09-03 15:4...|   "139.69"|  "139.769"|  "139.635"|  "139.769"|  "79292"]|
|["2021-09-03 15:3...| "139.8399"|   "139.86"|  "139.655"|   "139.69"|  "49114"]|
|["2021-09-03 15:1...|   "139.76"|   "139.88"|   "139.75"|   "139.83"|  "50153"]|
|["2021-09-03 15:0...|   "139.78"|   "139.84"|   "139.72"|   "139.74"|  "38715"]|
|["2021-09-03 14:4...|   "139.61"|   "139.78"|   "139.59"|   "139.78"|  "31959"]|
|["2021-09-03 14

# Cleaning Data

In [9]:
dfs2 = dfs1.withColumn('open', regexp_replace('open', '"', ''))\
    .withColumn('Time', regexp_replace('time', '\\["', ''))\
    .withColumn('Time', regexp_replace('time', '"', ''))\
    .withColumn('High', regexp_replace('high', '"', ''))\
    .withColumn('Low', regexp_replace('low', '"', ''))\
    .withColumn('Close', regexp_replace('close', '"', ''))\
    .withColumn('Volume', regexp_replace('volume', '"', ''))\
    .withColumn('Volume', regexp_replace('volume', '\\]', ''))\
   

# casting string to double values

In [10]:
dfs3 = dfs2\
    .withColumn("open",col("Open").cast("double"))\
    .withColumn("high",col("High").cast("double"))\
    .withColumn("low",col("Low").cast("double"))\
    .withColumn("close",col("Close").cast("double"))\
    .withColumn("volume",col("Volume").cast("double"))

In [11]:
dfs3.show()

+-------------------+--------+--------+--------+--------+--------+
|               Time|    open|    high|     low|   close|  volume|
+-------------------+--------+--------+--------+--------+--------+
|2021-09-03 19:00:00|  139.55|  139.55|  139.55|  139.55|  2749.0|
|2021-09-03 17:45:00|  139.65|  139.65|  139.65|  139.65|   150.0|
|2021-09-03 16:15:00|  139.58|  139.61|  139.55|  139.55| 31003.0|
|2021-09-03 16:00:00| 139.765| 139.885|  139.54|  139.62|269779.0|
|2021-09-03 15:45:00|  139.69| 139.769| 139.635| 139.769| 79292.0|
|2021-09-03 15:30:00|139.8399|  139.86| 139.655|  139.69| 49114.0|
|2021-09-03 15:15:00|  139.76|  139.88|  139.75|  139.83| 50153.0|
|2021-09-03 15:00:00|  139.78|  139.84|  139.72|  139.74| 38715.0|
|2021-09-03 14:45:00|  139.61|  139.78|  139.59|  139.78| 31959.0|
|2021-09-03 14:30:00|139.6829|139.6861|  139.57|139.6311| 31552.0|
|2021-09-03 14:15:00|  139.63|  139.72|  139.57|  139.69| 34371.0|
|2021-09-03 14:00:00|  139.76|  139.76|  139.63|  139.64| 3665

In [12]:
import matplotlib.pyplot as plt
check_na = dfs3.toPandas()
check_na1 = check_na.set_index("Time")
check_na1.isna().sum()

open      0
high      0
low       0
close     0
volume    0
dtype: int64

# creating vectors from features
# Apache MLlib takes input in vector form

In [13]:
featureassembler=VectorAssembler(inputCols=["open","high","low"],outputCol="features")
output=featureassembler.transform(dfs3)
output.select("features").show(5,truncate=False)

+------------------------+
|features                |
+------------------------+
|[139.55,139.55,139.55]  |
|[139.65,139.65,139.65]  |
|[139.58,139.61,139.55]  |
|[139.765,139.885,139.54]|
|[139.69,139.769,139.635]|
+------------------------+
only showing top 5 rows



# finalized_data consist of features and label which is close.

In [14]:
finalized_data=output.select("time","features","close").sort("time",ascending=True)

In [15]:
finalized_data.show(5)

+-------------------+--------------------+------+
|               time|            features| close|
+-------------------+--------------------+------+
|2021-08-09 04:15:00|[142.96,143.8,142.5]| 142.5|
|2021-08-09 07:15:00|[142.4,142.63,142.4]|142.55|
|2021-08-09 07:45:00| [142.7,142.8,142.7]| 142.8|
|2021-08-09 08:00:00| [142.7,142.7,142.7]| 142.7|
|2021-08-09 08:15:00|[142.446,143.2499...|142.33|
+-------------------+--------------------+------+
only showing top 5 rows



# splitting data into train and test

In [16]:
final_data = finalized_data.withColumn("rank",percent_rank().over(Window.partitionBy().orderBy("time")))

In [17]:
train_data = final_data.where("rank <= .8").drop("rank")
test_data = final_data.where("rank > .8").drop("rank")

In [21]:
train_data.count

<bound method DataFrame.count of DataFrame[time: string, features: vector, close: double]>

In [31]:
test_data.write.("test_data")

# Creating A Model


In [None]:

linear_reg=LinearRegression(featuresCol='features',labelCol='close')
lr_model=linear_reg.fit(train_data)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))


# Using our Linear Regression model to make some predictions: 

In [None]:
predDF = lr_model.transform(test_data)
predDF.select("features", "close","prediction").show(5)

# Evaluating Models

## RMSE
### RMSE is a metric that ranges from zero to infinity. The closer it is to zero, the better.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol="prediction",
labelCol="close",
metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")

# Saving Models

In [None]:
lr_model.save("lrm_model")

In [None]:
from pyspark.ml.regression import LinearRegressionModel
lrcvModel = LinearRegressionModel.load('/user/ubunta/lrm_model')
output = lrcvModel.transform(test_data)
output.show()