In [1]:
# !pip install findspark

In [2]:
# import findspark
# findspark.init()
# findspark.add_packages("org.apache.spark:spark-sql-kafka-0-10_2.12-3.1.1")

In [3]:
import os
import time

SCALA_VERSION = '2.12'
SPARK_VERSION = '3.1.2'

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'

import findspark
import pyspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as f
from IPython.display import display, clear_output
from pyspark.sql.streaming import DataStreamReader
spark = (SparkSession
         .builder
         .appName('hsd-spark-kafka')
         .master('local[*]')
         .getOrCreate())

In [106]:
timestampformat = "yyyy-MM-dd HH:mm:ss"
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [107]:
df = (spark.readStream.format('kafka')
      .option("kafka.bootstrap.servers", "localhost:9092") 
      .option("subscribe", "detected") 
      .option("startingOffsets", "latest")
      .load())
# # convert key, value from binary to string
# df = df.selectExpr("CAST(value AS STRING)")

In [131]:
from pyspark.sql.types import StructType, StructField, LongType, IntegerType, StringType
schema_value = StructType(
    [StructField("author",StringType(),True),
    StructField("date",StringType(),True),
    StructField("raw_comment",StringType(),True),
    StructField("clean_comment",StringType(),True),
    StructField("label",IntegerType(),True)])

df_json = (df
           .selectExpr("CAST(value AS STRING)")
           .withColumn("value",f.from_json("value",schema_value)))
df_column = (df_json.select(f.col("value.author").alias("author"),
#                             f.col("value.date").alias("timestamp"),
                           f.to_timestamp(f.regexp_replace('value.date','[TZ]',' '),timestampformat).alias("timestamp"),
                           f.col("value.raw_comment").alias("comment"),
                           f.col("value.clean_comment").alias("clean_comment"),
                           f.col("value.label").alias("label")
                           ))
df_count = (df_column.groupBy('label').agg(f.count('label').alias('count'))
            .withColumn('sentiment',f.when(df_column.label==1,'OFFENSIVE')
                        .when(df_column.label==0,'CLEAN')
                        .otherwise('HATE'))
           .select(f.col('sentiment'),f.col('count')))

In [140]:
# Write value data from a DataFrame to a specific Kafka topic specified in an option
ds = (df_column 
#       .selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
      .writeStream 
      .format("memory") 
      .queryName("hsd_query")
      .outputMode("append")
#   .option("kafka.bootstrap.servers", "localhost:9092") 
#   .option("topic", "detected") 
#   .option("checkpointLocation", "E:\download")\
      .start())
# Count query
ds_count = (df_count
            .writeStream
            .format("memory")
            .queryName("hsd_count")
            .outputMode("complete")
#   .option("kafka.bootstrap.servers", "localhost:9092") 
#   .option("topic", "detected") 
#   .option("checkpointLocation", "E:\download")\
            .start())

IllegalArgumentException: Cannot start query with name hsd_count as a query with that name is already active in this SparkSession

In [141]:
if df.isStreaming:
    for x in range(0,200):
        try:
            if not ds.isActive:
                print("Query is not active!")
                break
            print("Stream write every 5 seconds")
            print(f"Second passed: {x*5}")
            
            result = spark.sql(f"select * from {ds.name}")
            rs_count = spark.sql(f"select * from {ds_count.name}")
            
            # display df to console
            display(rs_count.toPandas())
            display(result.toPandas())
            
            time.sleep(5)
            clear_output(wait=True)
        except KeyboardInterrupt:
            print("Stop write!")
            ds.stop()
            ds_count.stop()
            break
    print("Streaming ends!") 
    ds.stop()
    ds_count.stop()
else:  print("Not streaming!")

Stream write every 5 seconds
Second passed: 155


Unnamed: 0,author,timestamp,comment,clean_comment,label
0,Hung Tran,2021-09-03 06:00:07,Bà ơi bà sca báo,bà ơi bà sca báo,0
1,đệ VinhMc,2021-08-30 02:50:02,Em có mắt kính,em có mắt_kính,0
2,đệ VinhMc,2021-08-28 09:59:25,😎😎🤑🤑😎😎😎🤑🤑🤑😎😎🤑🤑🤑😎🤑😎🤑😎🤑🤑🤑🤑😎🤑😎😎🤑🤑😎😎😎😎🤑🤑🤑😎😎🤑😎🤑🤑😎🤑🤑...,,0
3,Son Tran,2021-08-23 08:57:34,Tua x2 đoạn múa quạt đê,tua đoạn múa quạt đi,0
4,KIếm tiền online,2021-08-11 12:56:27,ai còn nghe ko nhỉ,ai còn nghe không nhỉ,0
5,Huyen Sam Phan,2021-08-10 08:28:55,hay qá anh ơi ^^@@^^,hay quá anh ơi,0
6,Sang Hà Ngọc,2021-07-28 12:27:06,"chúa tể remix, ông hoàng giật giật, ...",chúa_tể remix ông hoàng giật giật,0
7,Mị Vinahouse,2021-07-26 08:15:19,Tui bảo đảm với các bạn 1 loa kẹo kéo dô phòng...,tui bảo_đảm với các bạn loa kẹo kéo vô phòng l...,0
8,Ho Hai,2021-07-20 16:49:15,Hai,hai,0
9,Kiệm Mai văn,2021-07-17 02:15:18,Cái thằng đội nón bảo hiểm bít nhảy ko,cái thằng đội nón bảo_hiểm biết nhảy không,0


Unnamed: 0,sentiment,count
0,OFFENSIVE,1
1,CLEAN,21


Stop write!
Streaming ends!
