In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, from_json
from pyspark.sql.types import StringType, DoubleType, StructType, StructField
from pyspark.ml.feature import Tokenizer, HashingTF, MinHashLSH
from textblob import TextBlob
from kafka import KafkaProducer
from pyspark.ml.linalg import SparseVector, DenseVector
import json
import ssl
import random

# Function for Reservoir Sampling
def reservoir_sample(data, k):
    sample = []
    for i, row in enumerate(data):
        if i < k:
            sample.append(row)
        else:
            j = random.randint(0, i)
            if j < k:
                sample[j] = row
    return sample

# Define UDF for sentiment analysis
def sentiment_analysis(text):
    return TextBlob(text).sentiment.polarity

sentiment_udf = udf(sentiment_analysis, DoubleType())

# Function to convert feature vectors to list
# Function to convert DenseVector and SparseVector to list
def convert_vectors_to_list(row):
    result = {}
    for key, value in row.asDict().items():
        if isinstance(value, SparseVector) or isinstance(value, DenseVector):
            result[key] = value.toArray().tolist()
        else:
            result[key] = value
    return result

# Updated process_batch function
def process_batch(batch_df, epoch_id):
    # Replace null values in 'content' with 'No Content'
    filled_df = batch_df.fillna({'content': 'No Content'})

    tokenizer = Tokenizer(inputCol="content", outputCol="words")
    hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=1024)
    batch_df = tokenizer.transform(filled_df)
    batch_df = hashingTF.transform(batch_df)

    mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
    model = mh.fit(batch_df)
    batch_lsh = model.transform(batch_df)

    sampled_data = reservoir_sample(batch_lsh.collect(), 400)  # Adjust sample size

    producer = KafkaProducer(
        bootstrap_servers='pkc-lgwgm.eastus2.azure.confluent.cloud:9092',
        security_protocol='SASL_SSL',
        sasl_mechanism='PLAIN',
        sasl_plain_username='FXV4ENHTLYJBOI65',
        sasl_plain_password='xwRGc4tX2C65xbaQ1f2188fgwXftUbGqUyC5BURiJWqdfZWj6rEkHLCATMZZPt0k',
        ssl_context=ssl.SSLContext(ssl.PROTOCOL_TLS),
        ssl_cafile=None,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

    for row in sampled_data:
        sentiment = sentiment_analysis(row['content'])
        sentiment_feedback = ""
        if sentiment>0.5:
            sentiment_feedback = "Positive"
        elif sentiment<(-0.5):
            sentiment_feedback = "Negative"
        else:
            sentiment_feedback = "Neutral"
        serialized_row = {"articleID": row['articleID'], "source": row['sourceDomain'], "title": row['title'], "content": row['content'], "sentiment": sentiment, "sentiment_feedback": sentiment_feedback}
        producer.send('topic_2', serialized_row)

    producer.close()

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("News Article Processing") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
    .getOrCreate()

# Read from Kafka
schema = StructType([
    StructField("articleID", StringType(), True),
    StructField("title", StringType(), True),
    StructField("content", StringType(), True),
    StructField("sourceDomain", StringType(), True)
])

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "LSH_New_8") \
    .option("startingOffsets", "earliest") \
    .load()
df = df.withColumn("jsonData", from_json(col("value").cast("string"), schema))
df = df.select("jsonData.*")

# Apply foreachBatch to process each micro-batch
query = df.writeStream \
    .outputMode("append") \
    .foreachBatch(process_batch) \
    .start()

#query = df \
#    .writeStream \
#    .outputMode("append") \
#    .format("console") \
#    .start()

query.awaitTermination()

24/05/03 15:02:19 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/t3/g6s9f9bn66qbwd6lnwkpgvyr0000gp/T/temporary-57b269fc-182f-494a-ae1f-36f438fc618f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/05/03 15:02:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/05/03 15:02:19 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 5
-------------------------------------------
+--------------------+--------------------+--------------------+--------------------+
|           articleID|               title|             content|        sourceDomain|
+--------------------+--------------------+--------------------+--------------------+
|323a36958c7d426db...|Exclusive | Sony,...|Failed to retriev...|The Wall Street J...|
+--------------------+--------------------+--------------------+--------------------+

-------------------------------------------
Batch: 6
-------------------------------------------
+--------------------+--------------------+--------------------+------------+
|           articleID|               title|             content|sourceDomain|
+--------------------+--------------------+--------------------+------------+
|2d7fae4c1e294e8b9...|Bucks' Damian Lil...|Failed to retriev...|        ESPN|
|0a478ddb3cf54bfdb...|Alarm in Israel a...|Article body not ...

                                                                                

-------------------------------------------
Batch: 10
-------------------------------------------
+--------------------+--------------------+--------------------+------------+
|           articleID|               title|             content|sourceDomain|
+--------------------+--------------------+--------------------+------------+
|eee26edca3224f809...|Kerbal Space Prog...|Failed to retriev...|         IGN|
+--------------------+--------------------+--------------------+------------+



ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=7 host=b7-pkc-lgwgm.eastus2.azure.confluent.cloud:9092 <connecting> [IPv4 ('20.80.214.162', 9092)]> returned error 12. Disconnecting.


-------------------------------------------
Batch: 11
-------------------------------------------
+--------------------+--------------------+--------------------+------------+
|           articleID|               title|             content|sourceDomain|
+--------------------+--------------------+--------------------+------------+
|3283f648d5894d729...|Duane Eddy, twang...|Duane Eddy, a pio...|    NBC News|
+--------------------+--------------------+--------------------+------------+

-------------------------------------------
Batch: 12
-------------------------------------------
+--------------------+--------------------+--------------------+-------------------+
|           articleID|               title|             content|       sourceDomain|
+--------------------+--------------------+--------------------+-------------------+
|ad9111024b384998a...|How to watch Anne...|We independently ...|Yahoo Entertainment|
+--------------------+--------------------+--------------------+---------

                                                                                

-------------------------------------------
Batch: 13
-------------------------------------------
+--------------------+--------------------+--------------------+-------------------+
|           articleID|               title|             content|       sourceDomain|
+--------------------+--------------------+--------------------+-------------------+
|4d2b6cc3c96243cdb...|Orion heat shield...|The heat shield o...|The Washington Post|
+--------------------+--------------------+--------------------+-------------------+

-------------------------------------------
Batch: 14
-------------------------------------------
+--------------------+--------------------+--------------------+------------+
|           articleID|               title|             content|sourceDomain|
+--------------------+--------------------+--------------------+------------+
|ab7a3bc4aeb54ed88...|UCLA: Police clea...|Article body not ...|    BBC News|
+--------------------+--------------------+--------------------+--

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/spartan/Library/Python/3.9/lib/python/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/spartan/Library/Python/3.9/lib/python/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

-------------------------------------------
Batch: 15
-------------------------------------------
+--------------------+--------------------+--------------------+------------+
|           articleID|               title|             content|sourceDomain|
+--------------------+--------------------+--------------------+------------+
|952b80de97314ab08...|US military admit...|The US military h...|         CNN|
+--------------------+--------------------+--------------------+------------+



24/05/03 16:26:24 WARN KafkaOffsetReaderAdmin: Error in attempt 1 getting Kafka offsets: 
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1714777905705, tries=1, nextAllowedTryMs=1714778784437) timed out at 1714778784337 after 1 attempt(s)
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2001)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions(ConsumerStrategy.scala:66)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions$(ConsumerStrategy.scala:65)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.retrieveAllPartitions(ConsumerStrategy.scala:102)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.assignedTopicPartitions(ConsumerStrategy.scala:113)
	at org.a