In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


spark = SparkSession.builder.appName("Spark_Kafka_StockData").master("local[*]").getOrCreate()

In [2]:
spark.sparkContext.setLogLevel("ERROR")


bootstrap_server = 'localhost:9092'
kafka_topic = 'StockStream'


stock_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_server) \
        .option("subscribe", kafka_topic ) \
        .option("startingOffsets", "latest") \
        .load()


stock_df.printSchema()
print(type(stock_df))

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

<class 'pyspark.sql.dataframe.DataFrame'>


In [3]:
df1 = stock_df.selectExpr("CAST(value AS STRING)")

print(type(df1))


# Define a schema for the stock data


stock_schema_str = "Time String,Open Double,High Double,Low Double,Close Double,Volume Double"


stock_df2 = df1.select(from_csv(col("value"),stock_schema_str).alias("stock"))

stock_df3 = stock_df2.select("stock.*")
                       
stock_df3.printSchema()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- Time: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)



In [4]:
stock_agg_write_stream = stock_df3 \
        .writeStream \
        .outputMode("append") \
        .trigger(processingTime="10 seconds") \
        .option("truncate", "false") \
        .format("memory") \
        .queryName("testedTable") \
        .start()

stock_agg_write_stream.awaitTermination(100)

                                                                                

False

In [5]:
stream_df = spark.sql("select * from testedTable")
stream_df.show(100,truncate = False)
stream_df.count()

+-------------------+--------+--------+--------+--------+--------+
|Time               |Open    |High    |Low     |Close   |Volume  |
+-------------------+--------+--------+--------+--------+--------+
|2022-02-28 19:37:00|122.52  |122.52  |122.52  |122.52  |300.0   |
|2022-02-28 19:09:00|122.65  |122.65  |122.65  |122.65  |200.0   |
|2022-02-28 19:08:00|122.65  |122.65  |122.65  |122.65  |235.0   |
|2022-02-28 18:55:00|122.65  |122.65  |122.65  |122.65  |165.0   |
|2022-02-28 18:45:00|122.65  |122.65  |122.65  |122.65  |150.0   |
|2022-02-28 18:43:00|122.65  |122.65  |122.65  |122.65  |150.0   |
|2022-02-28 18:32:00|122.65  |122.65  |122.65  |122.65  |168.0   |
|2022-02-28 17:09:00|122.56  |122.56  |122.55  |122.55  |500.0   |
|2022-02-28 16:57:00|122.64  |122.64  |122.64  |122.64  |560.0   |
|2022-02-28 16:13:00|122.51  |122.51  |122.51  |122.51  |6358.0  |
|2022-02-28 16:07:00|122.51  |122.51  |122.51  |122.51  |7249.0  |
|2022-02-28 16:06:00|122.51  |122.51  |122.51  |122.51  |27397

8656

In [6]:
stream_df.printSchema()

root
 |-- Time: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)



In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegressionModel


feature_cols = ["Open", "High", "Low"]
vect_assembler = VectorAssembler(inputCols = feature_cols, outputCol="features")

data_w_features = vect_assembler.transform(stream_df)

final_df = data_w_features.select('features','Close','Time')
final_df.count()
final_df.show(100, truncate=False)


+----------------------------+--------+-------------------+
|features                    |Close   |Time               |
+----------------------------+--------+-------------------+
|[122.52,122.52,122.52]      |122.52  |2022-02-28 19:37:00|
|[122.65,122.65,122.65]      |122.65  |2022-02-28 19:09:00|
|[122.65,122.65,122.65]      |122.65  |2022-02-28 19:08:00|
|[122.65,122.65,122.65]      |122.65  |2022-02-28 18:55:00|
|[122.65,122.65,122.65]      |122.65  |2022-02-28 18:45:00|
|[122.65,122.65,122.65]      |122.65  |2022-02-28 18:43:00|
|[122.65,122.65,122.65]      |122.65  |2022-02-28 18:32:00|
|[122.56,122.56,122.55]      |122.55  |2022-02-28 17:09:00|
|[122.64,122.64,122.64]      |122.64  |2022-02-28 16:57:00|
|[122.51,122.51,122.51]      |122.51  |2022-02-28 16:13:00|
|[122.51,122.51,122.51]      |122.51  |2022-02-28 16:07:00|
|[122.51,122.51,122.51]      |122.51  |2022-02-28 16:06:00|
|[122.51,122.51,122.51]      |122.51  |2022-02-28 16:05:00|
|[122.51,122.51,122.51]      |122.51  |2

In [8]:
#train dataset-test dataset
train_dataset, test_dataset = final_df.randomSplit([0.8, 0.2])    #80-20 percent
train_dataset.describe().show()


+-------+------------------+-------------------+
|summary|             Close|               Time|
+-------+------------------+-------------------+
|  count|              6902|               6902|
|   mean|129.85685745488556|               null|
| stddev| 5.367610307250789|               null|
|    min|            118.96|2022-01-31 04:15:00|
|    max|     137.135590411|2022-02-28 19:37:00|
+-------+------------------+-------------------+



In [9]:
test_dataset.describe().show()

+-------+------------------+-------------------+
|summary|             Close|               Time|
+-------+------------------+-------------------+
|  count|              1754|               1754|
|   mean|129.98346873025883|               null|
| stddev| 5.346089615851399|               null|
|    min|          118.9134|2022-01-31 09:37:00|
|    max|     138.301272566|2022-02-28 17:09:00|
+-------+------------------+-------------------+



In [10]:
#importing linear regression module

from pyspark.ml.regression import LinearRegression

LinReg = LinearRegression(featuresCol="features", labelCol= "Close")

#Train the model on the training using fit() method.
model = LinReg.fit(train_dataset)

pred = model.evaluate(test_dataset)

pred.predictions.show(20)


+--------------------+--------+-------------------+------------------+
|            features|   Close|               Time|        prediction|
+--------------------+--------+-------------------+------------------+
|[121.07,121.12,12...|  121.11|2022-02-28 14:29:00|121.11094746391385|
|[121.0998,121.13,...|  121.08|2022-02-28 14:31:00|121.07814569261018|
|[121.16,121.205,1...|  121.17|2022-02-28 14:27:00|121.19677371279529|
|[121.16,121.24,12...|  121.24|2022-02-28 14:37:00|121.20273018308055|
|[121.18,121.24,12...|  121.24|2022-02-28 14:13:00|121.21381233083646|
|[121.19,121.25,12...|  121.24|2022-02-28 14:15:00|121.22381049316606|
|[121.24,121.28,12...|121.2412|2022-02-28 14:23:00|121.27260179934716|
|[121.25,121.25,12...|  121.16|2022-02-28 14:24:00|121.13362219345161|
|[121.27,121.3222,...|  121.31|2022-02-28 14:41:00|121.30502521238677|
|[121.37,121.38,12...|  121.28|2022-02-28 14:10:00|121.30820259419491|
|[121.45,121.55,12...|  121.53|2022-02-28 14:50:00| 121.5170204104149|
|[121.



In [11]:
#finding out the coefficients

print ("The coefficient of the model is : %a" %model.coefficients)
print ("The Intercept of the model is : %f" %model.intercept)

The coefficient of the model is : DenseVector([-0.6031, 0.8314, 0.7715])
The Intercept of the model is : 0.021624


In [12]:
#finding out various errors

from pyspark.ml.evaluation import RegressionEvaluator


evaluation = RegressionEvaluator(labelCol="Close", predictionCol="prediction")

# Root Mean Square Error
rmse = evaluation.evaluate(pred.predictions, {evaluation.metricName: "rmse"})
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = evaluation.evaluate(pred.predictions, {evaluation.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = evaluation.evaluate(pred.predictions, {evaluation.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = evaluation.evaluate(pred.predictions, {evaluation.metricName: "r2"})
print("R-squared(r2): %.3f" %r2)

RMSE: 0.040
MSE: 0.002
MAE: 0.024
R-squared(r2): 1.000


In [13]:
hdfs_predicted_df = pred.predictions
#hdfs_predicted_df.show(10)

In [14]:
#saving the trained model to hadoop

#model.save("hdfs://localhost:9000/Stock_Prediction/prediction_model")


#saving the test dataset model to hadoop in parquet format for compressed storage

#test_dataset.write.parquet("hdfs://localhost:9000/Stock_Prediction/test_dataset")


#saving the result dataset on hdfs after testing in parquet format for compressed storage

#hdfs_predicted_df.write.parquet("hdfs://localhost:9000/Stock_Prediction/Clean_Predicted_Data")

#saving the model to local
#model.save("/home/hdoop/sparknotebook/StockDataModel")


In [15]:
#test_dataset.write.parquet("Stock_Test_Data")

                                                                                