In [1]:
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col, window
from time import sleep
from IPython.display import clear_output
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt
from datetime import datetime

import warnings
warnings.filterwarnings('ignore')

In [2]:
streamingRawDF = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "sparkTopic") \
  .load()

streamingDF = streamingRawDF.selectExpr("CAST(value AS STRING) as text", "timestamp")

In [3]:
lrModel = PipelineModel.load("file:/home/student/Desktop/twitch-big-data-project/models/lr_regParam0.3")

                                                                                

In [4]:
streamingPredictionDF = lrModel.transform(streamingDF).select('text', 'prediction', 'timestamp')

In [5]:
logs_path = 'message-logs'
checkpoint_path = 'message-checkpoints'

streamingPredictionDF \
    .writeStream \
    .format("parquet") \
    .queryName("changes_ingestion") \
    .option("checkpointLocation", checkpoint_path) \
    .option("path", logs_path) \
    .outputMode("append") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7fda70ecc370>

In [7]:
mySchema = spark.read.parquet(logs_path).schema

df_stream = (
    spark \
    .readStream \
    .schema(mySchema) \
    .format("parquet") \
    .load(logs_path) \
)

2022-12-01 02:53:08,137 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB
2022-12-01 02:53:09,249 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB


In [8]:
df_count = (
    df_stream \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(window(col("timestamp"), "10 minutes", "10 minutes"), col("prediction")) \
    .count())

2022-12-01 02:53:10,441 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB
2022-12-01 02:53:11,110 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB
2022-12-01 02:53:11,828 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB


In [9]:
queryStream = (df_count \
 .writeStream \
 .format("memory") \
 .queryName("msg_changes") \
 .outputMode("update") \
 .start())

2022-12-01 02:53:12,485 WARN streaming.StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0b880f08-6172-4295-b846-474d54fd95fb. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
2022-12-01 02:53:13,176 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB
2022-12-01 02:53:13,846 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB
2022-12-01 02:53:14,573 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB
2022-12-01 02:53:16,174 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB

In [None]:
matplotlib.rc('font', family='DejaVu Sans')
sns.set(style="whitegrid")


try:
    i=1
    while True:
        # Clear output
        clear_output(wait=True)
        df = spark.sql(
                """
                    select
                        window.start
                        ,window.end
                        ,prediction
                        ,sum(count) message_count
                    from
                        msg_changes
                    where
                        window.start = (select max(window.start) from msg_changes)
                    group by
                        window.start
                        ,window.end
                        ,prediction
                    order by
                        prediction desc
                """
        ).toPandas()
        
        sns.set_color_codes("muted")
        
        display(df)
        
        plt.figure(figsize=(8,6))
        try:
            # Barplot
            sns.barplot(x="prediction", y="message_count", data=df).set(title='Just Chatting')
            fig = plt.gcf()
            # Show barplot
            plt.show()
            sleep(10)
            i=i+1
        except ValueError:
            # If Dataframe is empty, pass
            pass
        
except KeyboardInterrupt:
    print("process interrupted.")

2022-12-01 03:15:01,392 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB
[Stage 1336:(136 + 6) / 200][Stage 1337:> (0 + 0) / 6][Stage 1339:> (0 + 0) / 1]