In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

scala_version = '2.12'  # your scala version
spark_version = '3.0.1' # your spark version
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:2.8.0' #your kafka version
]
spark = SparkSession.builder.master("local").appName("kafka-example").config("spark.jars.packages", ",".join(packages)).getOrCreate()
spark

In [2]:
topic_name = 'amzn_stock_data3'
kafka_server = 'localhost:9092'

kafkaDf = spark.read.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_name).option("startingOffsets", "earliest").load()

In [3]:
from pyspark.sql.functions import col, concat, lit

In [4]:
kafkaDf.show()

+----+--------------------+----------------+---------+------+--------------------+-------------+
| key|               value|           topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------------+---------+------+--------------------+-------------+
|NULL|[22 31 39 39 37 2...|amzn_stock_data3|        0|     0|2024-01-23 19:46:...|            0|
|NULL|[22 31 39 39 37 2...|amzn_stock_data3|        0|     1|2024-01-23 19:46:...|            0|
|NULL|[22 31 39 39 37 2...|amzn_stock_data3|        0|     2|2024-01-23 19:46:...|            0|
|NULL|[22 31 39 39 37 2...|amzn_stock_data3|        0|     3|2024-01-23 19:46:...|            0|
|NULL|[22 31 39 39 37 2...|amzn_stock_data3|        0|     4|2024-01-23 19:46:...|            0|
|NULL|[22 31 39 39 37 2...|amzn_stock_data3|        0|     5|2024-01-23 19:46:...|            0|
|NULL|[22 31 39 39 37 2...|amzn_stock_data3|        0|     6|2024-01-23 19:46:...|            0|
+----+--------------------+---

In [5]:
from pyspark.sql.functions import from_json, split

In [6]:
df = kafkaDf.selectExpr("CAST(value AS STRING)").select(split("value", ",").alias("amzn_values")) \
    .selectExpr("amzn_values[0] as Date", "amzn_values[1] as Open", \
                "amzn_values[2] as High", "amzn_values[3] as Low", \
                "amzn_values[4] as Close","amzn_values[5] as Adj_close",\
                "amzn_values[6] as Volume")

In [7]:
df.show()

+-----------+--------+--------+--------+--------+---------+-----------+
|       Date|    Open|    High|     Low|   Close|Adj_close|     Volume|
+-----------+--------+--------+--------+--------+---------+-----------+
|"1997-05-15|0.121875|0.125000|0.096354|0.097917| 0.097917|1443120000"|
|"1997-05-16|0.098438|0.098958|0.085417|0.086458| 0.086458| 294000000"|
|"1997-05-19|0.088021|0.088542|0.081250|0.085417| 0.085417| 122136000"|
|"1997-05-20|0.086458|0.087500|0.081771|0.081771| 0.081771| 109344000"|
|"1997-05-21|0.081771|0.082292|0.068750|0.071354| 0.071354| 377064000"|
|"1997-05-22|0.071875|0.072396|0.065625|0.069792| 0.069792| 235536000"|
|"1997-05-23|0.070313|0.076042|0.066667|0.075000| 0.075000| 318744000"|
|"1997-05-27|0.075521|0.082292|0.072917|0.079167| 0.079167| 173952000"|
+-----------+--------+--------+--------+--------+---------+-----------+



In [8]:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import datetime as dt
import pandas as pd
import json
import time
from pyspark.sql import functions as F
from pyspark.sql.functions import from_json, split, current_date, year, col, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.sql.functions import split
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import to_date, col
from pyspark.sql.types import DoubleType
from time import sleep
from IPython.display import display, clear_output
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import expr

In [9]:
lrModel=LinearRegressionModel.load("G:/Saved model 2")

In [10]:
for x in range(0, 1000):
    try:
        print("Showing live view refreshed every 5 seconds")
        print(f"Seconds passed: {x*5}")
        kafkaDf = spark.read.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_name).option("startingOffsets", "earliest").load()
        df = kafkaDf.selectExpr("CAST(value AS STRING)").select(split("value", ",").alias("amzn_values")) \
    .selectExpr("amzn_values[0] as Date", "amzn_values[1] as Open", \
                "amzn_values[2] as High", "amzn_values[3] as Low", \
                "amzn_values[4] as Close","amzn_values[5] as Adj_close", "amzn_values[6] as Volume")
        
        df1 = df \
    .withColumn("Date", expr("substring(Date, 2, 10)")) \
    .withColumn("Date", to_date(col("Date"), "yyyy-MM-dd")) \
    .withColumn("Open", col("Open").cast("double")) \
    .withColumn("High", col("High").cast("double")) \
    .withColumn("Low", col("Low").cast("double")) \
    .withColumn("Close", col("Close").cast("double")) \
    .withColumn("Adj_close", col("Adj_close").cast("double")) \
    .withColumn("Volume", expr("substring(Volume, 1, length(Volume)-1)").cast("int"))
        
        featureassembler=VectorAssembler(inputCols=["Open", "High", "Low"], outputCol='Features')
        output=featureassembler.transform(df1)
        final_data=output.select("Date", "Features", "Close")
        final_output=lrModel.transform(final_data)
        final_output.show()      
        sleep(5)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
        break

Showing live view refreshed every 5 seconds
Seconds passed: 90
+----------+--------------------+--------+-------------------+
|      Date|            Features|   Close|         prediction|
+----------+--------------------+--------+-------------------+
|1997-05-15|[0.121875,0.125,0...|0.097917|0.11267927516808793|
|1997-05-16|[0.098438,0.09895...|0.086458|0.09681569056116437|
|1997-05-19|[0.088021,0.08854...|0.085417|0.09128152466000145|
|1997-05-20|[0.086458,0.0875,...|0.081771|0.09178373546140325|
|1997-05-21|[0.081771,0.08229...|0.071354|0.08015858364583206|
|1997-05-22|[0.071875,0.07239...|0.069792| 0.0755507846551898|
|1997-05-23|[0.070313,0.07604...|   0.075|0.08040359350605483|
|1997-05-27|[0.075521,0.08229...|0.079167|0.08729854115335386|
|1997-05-28|[0.08125,0.081771...|0.076563| 0.0861401671477983|
|1997-05-29|[0.077083,0.07708...| 0.07526|0.08275572720081965|
|1997-05-30|[0.075,0.075521,0...|   0.075|0.08273744019385013|
|1997-06-02|[0.075521,0.07656...|0.075521|0.08410304526