# 5.0 Event Streaming

###### Author: Yeap Jie Shen, Gan Yee Jing
###### Last Edited: 02/09/2024

## 5.2 Spark Structured Streaming 
### 5.2.1 Importing Necessary Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDFModel, Word2VecModel, NGram, CountVectorizer, StringIndexer
from pyspark.sql.functions import lower, regexp_replace, regexp_extract, udf, col, expr, from_json, size
from pyspark.sql.types import ArrayType, StringType, FloatType
from pyspark.ml.linalg import DenseVector

import nltk
from nltk.corpus import words
from nltk.stem import WordNetLemmatizer


import sys

sys.path.append(r'/home/student/RDS2S3G4_CLO2_B')

from data_stores.vectorArrayConverter import VectorArrayConverter

# Creating spark session
spark = SparkSession.builder.appName('structured streaming').config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1').getOrCreate()

24/09/02 17:36:20 WARN Utils: Your hostname, Gan. resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/09/02 17:36:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/student/de-prj/de-venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/student/.ivy2/cache
The jars for the packages stored in: /home/student/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9e264c65-2b63-47f9-a2f4-9d092e6b538a;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 657ms :: artifacts dl 22ms
	

### 5.2.2 Consuming Raw News From Producer and Preprocess the Data

In [2]:
# Defining variables and functions

# Define schema for JSON data
schema = StructType([
    StructField('url', StringType(), True),
    StructField('headline', StringType(), True),
    StructField('datetime', StringType(), True),
    StructField('content', StringType(), True),
    StructField('publisher', StringType(), True),
    StructField('author', StringType(), True)
])

content_tokenizer = Tokenizer(outputCol = 'content_tokens', inputCol = 'content') # tokeniser
headline_tokenizer = Tokenizer(outputCol = 'headline_tokens', inputCol = 'headline') # tokeniser

english_words = set(words.words())

# Define a UDF to filter words not in the English corpus
@udf(returnType = ArrayType(StringType()))
def filter_non_english_words(words):
    return [word for word in words if word.lower() in english_words]

content_stopword_remover = StopWordsRemover(inputCol= 'content_tokens', outputCol = 'cleaned_content_tokens') # Stopword remover
headline_stopword_remover = StopWordsRemover(inputCol= 'headline_tokens', outputCol = 'cleaned_headline_tokens') # Stopword remover

# Lemmatiser
lemmatizer_broadcast = spark.sparkContext.broadcast(WordNetLemmatizer())

@udf(returnType = ArrayType(StringType()))
def lemmatize_words(words):
    lemmatizer = lemmatizer_broadcast.value
    return [lemmatizer.lemmatize(word) for word in words]

# Function to remove tokens with character length less than 4
@udf(returnType = ArrayType(StringType()))
def remove_short_length_words(words):
    return [word for word in words if len(word) > 3]

# TF-IDF
hashing_tf_content = HashingTF(inputCol='1gram_content', outputCol='1tf_content', numFeatures=20)
idf_content_model = IDFModel.load(r'../model/1_idf_content')

# Word2Vec
word2vec = Word2VecModel.load(r'../model/1_gram_word2vec_content')

record_count = 0

def count_records_stop(df_batch, batch_id):
    global record_count
    
    batch_count = df_batch.count()
    record_count += batch_count
    

    df_kafka = (
        df_batch
        .withColumn('1tf_idf_content', VectorArrayConverter.vector_to_array(df_batch['1tf_idf_content']))
        .withColumn('1gram_word2vec_content', VectorArrayConverter.vector_to_array(df_batch['1gram_word2vec_content']))
    )
    
    # Sink the batch to Kafka
    df_kafka = df_kafka.selectExpr("to_json(struct(*)) AS value")  # Convert the DataFrame to JSON format

    df_kafka.write \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'localhost:9092') \
        .option('topic', 'ProcessedCrimeNews') \
        .save()

    print(f'Batch ID: {batch_id} consumed {batch_count} records (Total {record_count} records)')
    df_batch.show()
    print(f'Batch ID: {batch_id} sinked {batch_count} records (Total {record_count} records)')

    if record_count > 10:
        query.stop()

                                                                                

In [3]:
# kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic ProcessedCrimeNews
# kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic ProcessedCrimeNews

# Read from Kafka
df_raw = spark.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'localhost:9092') \
    .option('subscribe', 'CrimeNews') \
    .option('startingOffsets', 'earliest') \
    .option('maxOffsetsPerTrigger', 3) \
    .load() 

df = df_raw.selectExpr('CAST(value AS STRING)')

# Parse JSON value to structured format and Extract fields from the structured format
df = df.withColumn('value', from_json(df['value'], schema)).select('value.*')

# Preprocess the data
# Noise Removal
df = (
    df
    .withColumn('content', lower('content'))
    .withColumn('headline', lower('headline'))
    .withColumn('content', regexp_replace('content', r'[^\d\w\s]+',''))
    .withColumn('headline', regexp_replace('headline', r'[^\d\w\s]+',''))
    .withColumn('content', regexp_replace('content', r'\d+', ''))
    .withColumn('headline', regexp_replace('headline', r'\d+', ''))
    .withColumn('content', regexp_replace('content', '[\U0001F600-\U0001F64F]', ''))
    .withColumn('headline', regexp_replace('headline', '[\U0001F600-\U0001F64F]', ''))
    .withColumn('content', regexp_replace('content', '[^\x00-\x7F]+', ''))
    .withColumn('headline', regexp_replace('headline', '[^\x00-\x7F]+', ''))
    .withColumn('content', regexp_replace('content', r'\s+',' '))
    .withColumn('headline', regexp_replace('headline', r'\s+',' '))
)

# Tokenization
df = content_tokenizer.transform(df)
df = headline_tokenizer.transform(df)

# Remove non-english words
df = df.withColumn('content_tokens', filter_non_english_words('content_tokens')).withColumn('headline_tokens', filter_non_english_words('headline_tokens'))

# Remove stopword
df = (
    headline_stopword_remover
    .transform(content_stopword_remover.transform(df))
    .select('url', 'headline', 'datetime', 'author', 'publisher', 'cleaned_content_tokens', 'cleaned_headline_tokens')
)

# Lemmatisation
df = (
    df
    .withColumn('cleaned_content_tokens', lemmatize_words('cleaned_content_tokens'))
    .withColumn('cleaned_headline_tokens', lemmatize_words('cleaned_headline_tokens'))
)

# Removing tokens with character length less than 4
df = (
    df
    .withColumn('cleaned_content_tokens', remove_short_length_words('cleaned_content_tokens'))
    .withColumn('cleaned_headline_tokens', remove_short_length_words('cleaned_headline_tokens'))
)

df = df.withColumnRenamed('cleaned_content_tokens', '1gram_content')

# TF-IDF
df = hashing_tf_content.transform(df)
df = idf_content_model.transform(df)

# Word2Vec
df = word2vec.transform(df)

query = df \
    .writeStream \
    .foreachBatch(count_records_stop) \
    .start()

# Await termination of the query
query.awaitTermination()

24/09/02 17:36:34 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2c496ff5-a0f4-48f4-80b8-ef225c693c86. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/09/02 17:36:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/09/02 17:36:34 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
24/09/02 17:36:37 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:38 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Uninterrupti

Batch ID: 0 consumed 3 records (Total 3 records)


24/09/02 17:36:42 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:42 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

+--------------------+--------------------+--------------------+------+----------------+--------------------+-----------------------+--------------------+--------------------+----------------------+
|                 url|            headline|            datetime|author|       publisher|       1gram_content|cleaned_headline_tokens|         1tf_content|     1tf_idf_content|1gram_word2vec_content|
+--------------------+--------------------+--------------------+------+----------------+--------------------+-----------------------+--------------------+--------------------+----------------------+
|https://selangorj...|altantuyas family...|2024-08-29T17:10:...|      |Selangor Journal|[family, late, mo...|   [family, bankrupt...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|  [0.18192636084833...|
|https://selangorj...|ambank founders m...|2024-08-21T16:02:...|      |Selangor Journal|[federal, court, ...|   [murder, federal,...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|  [0.04813456745300...|
|http

24/09/02 17:36:43 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:43 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:43 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:44 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:44 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:44 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

Batch ID: 1 consumed 3 records (Total 6 records)
+--------------------+--------------------+--------------------+------+----------------+--------------------+-----------------------+--------------------+--------------------+----------------------+
|                 url|            headline|            datetime|author|       publisher|       1gram_content|cleaned_headline_tokens|         1tf_content|     1tf_idf_content|1gram_word2vec_content|
+--------------------+--------------------+--------------------+------+----------------+--------------------+-----------------------+--------------------+--------------------+----------------------+
|https://selangorj...|man pleads guilty...|2024-08-23T16:08:...|      |Selangor Journal|[delivery, guilty...|   [guilty, kicking,...|(20,[0,1,2,3,5,6,...|(20,[0,1,2,3,5,6,...|  [-0.1826675417833...|
|https://selangorj...|police arrest two...|2024-08-24T22:08:...|      |Selangor Journal|[police, drug, sy...|   [police, arrest, ...|(20,[0,1,2,3,4,5,...|(

24/09/02 17:36:47 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:47 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:47 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:47 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:47 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:47 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

Batch ID: 2 consumed 3 records (Total 9 records)
+--------------------+--------------------+--------------------+------+----------------+--------------------+-----------------------+--------------------+--------------------+----------------------+
|                 url|            headline|            datetime|author|       publisher|       1gram_content|cleaned_headline_tokens|         1tf_content|     1tf_idf_content|1gram_word2vec_content|
+--------------------+--------------------+--------------------+------+----------------+--------------------+-----------------------+--------------------+--------------------+----------------------+
|https://selangorj...|november defence ...|2024-08-29T17:58:...|      |Selangor Journal|[court, three, st...|   [defence, unregis...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|  [0.02922681798537...|
|https://selangorj...|man walks free af...|2024-08-22T18:14:...|      |Selangor Journal|[session, court, ...|   [free, offensive,...|(20,[0,1,2,3,4,5,...|(

24/09/02 17:36:49 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:49 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:49 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:49 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:49 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:49 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

Batch ID: 3 consumed 3 records (Total 12 records)


24/09/02 17:36:50 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:50 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/09/02 17:36:50 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


+--------------------+--------------------+--------------------+------+----------------+--------------------+-----------------------+--------------------+--------------------+----------------------+
|                 url|            headline|            datetime|author|       publisher|       1gram_content|cleaned_headline_tokens|         1tf_content|     1tf_idf_content|1gram_word2vec_content|
+--------------------+--------------------+--------------------+------+----------------+--------------------+-----------------------+--------------------+--------------------+----------------------+
|https://selangorj...|johor broker lose...|2024-08-21T15:59:...|      |Selangor Journal|[broker, million,...|   [broker, bogus, f...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|  [0.27169035918139...|
|https://selangorj...|businessman plead...|2024-08-29T09:28:...|      |Selangor Journal|[businessman, gui...|   [businessman, gui...|(20,[0,1,2,3,4,6,...|(20,[0,1,2,3,4,6,...|  [0.12104074949989...|
|http

In [4]:
spark.stop()