In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json

scala_version = '2.12'  # your scala version
spark_version = '3.5.0' # your spark version
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:2.8.0' #your kafka version
]
spark = SparkSession.builder.master("local").appName("kafka-example").config("spark.jars.packages", ",".join(packages)).getOrCreate()
spark

In [2]:
packages

['org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0',
 'org.apache.kafka:kafka-clients:2.8.0']

Training model

In [3]:
#Khai báo thư viện
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import count, when, isnull, split, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from IPython.display import display, clear_output
from time import sleep
import matplotlib.pyplot as plt
import pandas as pd

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
train = spark.read.csv("./data/AAPL/train.csv", header=True, inferSchema=True)

In [6]:
train.toPandas()

Unnamed: 0,Date,Open,High,Low,Close,Volume,Close_after_30_days
0,1980-12-12,0.128348,0.128906,0.128348,0.128348,469033600,0.142857
1,1980-12-15,0.122210,0.122210,0.121652,0.121652,175884800,0.138393
2,1980-12-16,0.113281,0.113281,0.112723,0.112723,105728000,0.133371
3,1980-12-17,0.115513,0.116071,0.115513,0.115513,86441600,0.126116
4,1980-12-18,0.118862,0.119420,0.118862,0.118862,73449600,0.118862
...,...,...,...,...,...,...,...
10791,2023-10-03,172.259995,173.630005,170.820007,172.399994,49594600,187.440002
10792,2023-10-04,171.089996,174.210007,170.970001,173.660004,53020300,188.009995
10793,2023-10-05,173.789993,175.449997,172.679993,174.910004,48527900,189.710007
10794,2023-10-06,173.800003,177.990005,173.179993,177.490005,57224100,189.690002


In [7]:
df_train = train.select("Close", "Close_after_30_days")

In [8]:
df_train.toPandas()

Unnamed: 0,Close,Close_after_30_days
0,0.128348,0.142857
1,0.121652,0.138393
2,0.112723,0.133371
3,0.115513,0.126116
4,0.118862,0.118862
...,...,...
10791,172.399994,187.440002
10792,173.660004,188.009995
10793,174.910004,189.710007
10794,177.490005,189.690002


In [9]:
#Chuyển đổi cột close thành Vector
feature_col = ["Close"]
assembler = VectorAssembler(inputCols=feature_col, outputCol="Feature")
df_train = assembler.transform(df_train).select("Feature", "Close_after_30_days")

In [10]:
df_train.toPandas()

Unnamed: 0,Feature,Close_after_30_days
0,[0.1283479928970337],0.142857
1,[0.12165199965238571],0.138393
2,[0.11272300034761429],0.133371
3,[0.11551299691200256],0.126116
4,[0.11886200308799744],0.118862
...,...,...
10791,[172.39999389648438],187.440002
10792,[173.66000366210938],188.009995
10793,[174.91000366210938],189.710007
10794,[177.49000549316406],189.690002


In [11]:
LR = LinearRegression(featuresCol="Feature", labelCol="Close_after_30_days")

In [12]:
model = LR.fit(df_train)

Dự đoán

In [13]:
topic_name = 'AAPLstream'
kafka_server = 'localhost:9092'

kafkaDf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_name).load()

In [14]:
df = kafkaDf.selectExpr("CAST(value AS STRING)").select(split("value", ",").alias("csv_values")) \
    .selectExpr("csv_values[4] as Close", "csv_values[6] as Close_after_30_days") 

In [15]:
df1 = df.alias("copied")
df1 = df1 \
    .withColumn("Close", col("Close").cast(DoubleType())) \
    .withColumn("Close_after_30_days", col("Close_after_30_days").cast(DoubleType()))

In [16]:
df1 = assembler.transform(df1).select("Feature", "Close_after_30_days")

In [17]:
predict = model.transform(df1)

In [18]:
from pyspark.sql.functions import to_json, struct

spark = SparkSession.builder \
    .appName("MySparkApplication") \
    .getOrCreate()

checkpoint_path = "./checkpoint_AAPL"

# Chuyển từng hàng của DataFrame thành chuỗi JSON
json_df = predict.select(to_json(struct("prediction")).alias("value"))

# Gửi dữ liệu lên Kafka
kafka_df = json_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "AAPL") \
    .option("checkpointLocation", checkpoint_path) \
    .start()

In [19]:
query = json_df.writeStream.format("memory").outputMode("append").queryName("streaming_query")
query2 = query.start()

In [20]:
for x in range(0, 2000):
    try:
        print("Showing live view refreshed every 5 seconds")
        print(f"Seconds passed: {x*5}")
        result2 = spark.sql(f"SELECT * from {query2.name}")
        display(result2.toPandas())
        sleep(5)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
        break
print("Live view ended...")

Showing live view refreshed every 5 seconds
Seconds passed: 175


Unnamed: 0,value
0,"{""prediction"":181.36475070499625}"
1,"{""prediction"":182.7967054852962}"
2,"{""prediction"":183.72087620134852}"
3,"{""prediction"":181.83191886249364}"
4,"{""prediction"":181.6998900469605}"
5,"{""prediction"":180.10544064697973}"
6,"{""prediction"":178.77504887806526}"
7,"{""prediction"":178.38914352674814}"
8,"{""prediction"":175.76897589566946}"
9,"{""prediction"":175.89083911226015}"


break
Live view ended...


In [21]:
query2.stop()