In [1]:
import pyspark

In [25]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("ConfluentKafkaConsumerExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .getOrCreate()


In [19]:
kafkaDataFrame = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "pkc-56d1g.eastus.azure.confluent.cloud:9092") \
    .option("subscribe", "reddit_data") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='H35D7ED7PN74YGWT' password='DMrW+RwSTk5YVnYvGufbv0K6fSCn0TF9ODJr4TfA9l4FTF3slrs3zUDYTUmDir2i';") \
    .option("kafka.ssl.endpoint.identification.algorithm", "https") \
    .option("startingOffsets", "earliest") \
    .load()


In [20]:
kafkaDataFrame.printSchema()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [26]:

from pyspark.sql.functions import col, expr

# Assuming kafkaDataFrame has been defined as shown previously
# and you want to display the contents in real-time

# Select and cast the data you're interested in
processedDataFrame = kafkaDataFrame.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")

# Set up the streaming query to write to the console



In [22]:
processedDataFrame

DataFrame[key: string, value: string, topic: string, partition: int, offset: bigint, timestamp: timestamp]

In [29]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re

def clean_text(text):
    # Remove URLs
    text = re.sub(r"http\S+|www\S+|https\S+", '', text, flags=re.MULTILINE)
    # Remove special characters and digits
    text = re.sub(r'\W+|\d+', ' ', text)
    # Optionally, remove single characters and extra spaces, convert to lowercase
    text = re.sub(r'\s+[a-zA-Z]\s+', ' ', text).strip()
    return text.lower()

# Register the function as a UDF
clean_text_udf = udf(clean_text, StringType())


In [34]:
# Assuming df_parsed is the DataFrame with the stream data
#df_cleaned = processedDataFrame.withColumn("cleaned_title", clean_text_udf(col("value")))


                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
-------------------------------------------
Batch: 8
-------------------------------------------
-------------------------------------------
Batch: 8
-------------------------------------------
-------------------------------------------
Batch: 5
-------------------------------------------
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                |topic      |partition|offset|timestamp              |
+----+--------------------------------------------------------------------------------------------------------------------------

                                                                                

-------------------------------------------
Batch: 9
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                                                                                                         |topic      |partition|offset|timestamp              |
+----+-----------------------------------------------------------------------------------------------------------

In [36]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType


In [39]:
schema = StructType([
    StructField("title", StringType()),
    StructField("url", StringType()),
    StructField("created_utc", FloatType()),
    StructField("author", StringType()),
    StructField("score", IntegerType()),
])

# Parse the JSON from the 'value' column
processedDataFramedf_parsed = processedDataFrame.withColumn("value", from_json("value", schema))
df_cleaned = processedDataFrame.withColumn("cleaned_title", clean_text_udf(col("value")))

                                                                                

-------------------------------------------
Batch: 17
-------------------------------------------
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                |topic      |partition|offset|timestamp              |
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|NULL|{"title": "Tweets going straight to drafts, can\u2019t follow anyone", "url": "https://i.redd.it/bz8bl0gen9mc1.jpeg", "created_utc": 1709535721.0, "author": "anonpetal", 

                                                                                

-------------------------------------------
Batch: 18
-------------------------------------------
-------------------------------------------
Batch: 18
-------------------------------------------
-------------------------------------------
Batch: 9
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                           |topic      |partition|offset|timestamp              |
+----+-------------------------------------------------------------------

                                                                                

-------------------------------------------
Batch: 18
-------------------------------------------
-------------------------------------------
Batch: 19
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                           |topic      |partition|offset|timestamp              |
+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

-------------------------------------------
Batch: 10
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                                                                                                         |topic      |partition|offset|timestamp              |
+----+----------------------------------------------------------------------------------------------------------

In [41]:
from nltk.sentiment import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

In [42]:
sia = SentimentIntensityAnalyzer()

# Define UDF for sentiment analysis
def sentiment_score(text):
    if text:
        return sia.polarity_scores(text)['compound']
    else:
        return 0.0

sentiment_score_udf = udf(sentiment_score, FloatType())

In [43]:
from pyspark.sql.functions import when

In [None]:
df_sentiment = df_cleaned.withColumn("sentiment_score", sentiment_score_udf(col("cleaned_title")))

# Classify the sentiment based on the score
df_classified = df_sentiment.withColumn("sentiment", 
                                        when(col("sentiment_score") > 0.05, "Positive")
                                        .when(col("sentiment_score") < -0.05, "Negative")
                                        .otherwise("Neutral"))

24/03/04 03:03:05 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

-------------------------------------------
Batch: 16
-------------------------------------------
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                |topic      |partition|offset|timestamp              |
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|NULL|{"title": "Tweets going straight to drafts, can\u2019t follow anyone", "url": "https://i.redd.it/bz8bl0gen9mc1.jpeg", "created_utc": 1709535721.0, "author": "anonpetal", 

                                                                                

-------------------------------------------
Batch: 24
-------------------------------------------
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                |topic      |partition|offset|timestamp              |
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|NULL|{"title": "Tweets going straight to drafts, can\u2019t follow anyone", "url": "https://i.redd.it/bz8bl0gen9mc1.jpeg", "created_utc": 1709535721.0, "author": "anonpetal", 

                                                                                

-------------------------------------------
Batch: 17
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                           |topic      |partition|offset|timestamp              |
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----

24/03/04 03:03:09 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:03:09 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 25
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                                                                                                         |topic      |partition|offset|timestamp              |
+----+----------------------------------------------------------------------------------------------------------

24/03/04 03:03:09 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:03:10 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

-------------------------------------------
Batch: 24
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                                                                                                         |topic      |partition|offset|timestamp              |
+----+----------------------------------------------------------------------------------------------------------

24/03/04 03:03:11 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:03:11 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key |value                                                                                                                                                                                                                                                                                                                        

24/03/04 03:03:12 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:03:12 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

-------------------------------------------
Batch: 18
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                                                                                                         |topic      |partition|offset|timestamp              |
+----+----------------------------------------------------------------------------------------------------------

24/03/04 03:03:14 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+----------------------------------------------------------------------------------------------+
|key |value                                                                                                                                                                                                                                  |topic      |partition|offset|timestamp              |cleaned_title                                                                                 |
+----+---------------------------------------------------------------------------------------------------------------------------

In [46]:
query = df_classified.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .start()  # Start the streaming query

# Wait for the streaming query to terminate (it will run indefinitely until you manually stop it)
query.awaitTermination()

24/03/04 03:03:16 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/bz/nrn3x_wx57d6kjj0s_jc2nqc0000gn/T/temporary-5ab07dfb-c1b8-4f6c-9669-1db3d009a566. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/04 03:03:16 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/03/04 03:03:16 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/03/04 03:03:16 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/03/04 03:03:16 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/03/04 03:03:16 WARN AdminClientConfig: The configuration '

-------------------------------------------
Batch: 0
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+---------+
|key |value                                                                                                                                                                                                                                                                                              

24/03/04 03:04:09 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:09 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:09 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 25
-------------------------------------------
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                |topic      |partition|offset|timestamp              |
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|NULL|{"title": "Tweets going straight to drafts, can\u2019t follow anyone", "url": "https://i.redd.it/bz8bl0gen9mc1.jpeg", "created_utc": 1709535721.0, "author": "anonpetal", 

24/03/04 03:04:10 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:10 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:10 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:10 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 26
-------------------------------------------
-------------------------------------------
Batch: 29
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                                                                                                         |topic      |partition|offset|timestamp              |
+----+--------

24/03/04 03:04:11 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:11 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:11 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:11 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:11 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 28
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                                                                                                         |topic      |partition|offset|timestamp              |
+----+----------------------------------------------------------------------------------------------------------

24/03/04 03:04:12 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:12 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:12 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 2
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+---------+
|key |value                                                                                                                                                                                                                                                                                              

24/03/04 03:04:12 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:12 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:12 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:13 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:04:13 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 27
-------------------------------------------
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                  |topic      |partition|offset|timestamp              |
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|NULL|{"title": "Twitter a

                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+--------------------------------------------------------------------------------+---------------+---------+
|key |value                                                                                                                                                                                                   |topic      |partition|offset|timestamp              |cleaned_title                                                                   |sentiment_score|sentiment|
+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------

24/03/04 03:05:11 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:11 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

-------------------------------------------
Batch: 29
-------------------------------------------
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                |topic      |partition|offset|timestamp              |
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|NULL|{"title": "Tweets going straight to drafts, can\u2019t follow anyone", "url": "https://i.redd.it/bz8bl0gen9mc1.jpeg", "created_utc": 1709535721.0, "author": "anonpetal", 

24/03/04 03:05:13 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:13 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:13 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

-------------------------------------------
Batch: 30
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                                                                                                         |topic      |partition|offset|timestamp              |
+----+----------------------------------------------------------------------------------------------------------

24/03/04 03:05:13 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:13 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 30
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                                                                                                         |topic      |partition|offset|timestamp              |
+----+----------------------------------------------------------------------------------------------------------

24/03/04 03:05:14 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:14 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:15 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:15 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:15 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

-------------------------------------------
Batch: 23
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                                                                                                         |topic      |partition|offset|timestamp              |
+----+----------------------------------------------------------------------------------------------------------

24/03/04 03:05:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

-------------------------------------------
Batch: 31
-------------------------------------------
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                  |topic      |partition|offset|timestamp              |
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|NULL|{"title": "Help pls 

24/03/04 03:05:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:05:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 31
-------------------------------------------
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                  |topic      |partition|offset|timestamp              |
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|NULL|{"title": "Help pls 

24/03/04 03:06:14 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:14 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:14 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 25
-------------------------------------------
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                |topic      |partition|offset|timestamp              |
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|NULL|{"title": "Tweets going straight to drafts, can\u2019t follow anyone", "url": "https://i.redd.it/bz8bl0gen9mc1.jpeg", "created_utc": 1709535721.0, "author": "anonpetal", 

                                                                                

-------------------------------------------
Batch: 26
-------------------------------------------
-------------------------------------------
Batch: 34
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                                                                                                         |topic      |partition|offset|timestamp              |
+----+--------

24/03/04 03:06:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 36
-------------------------------------------
-------------------------------------------
Batch: 33
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                                                                                                                         |topic      |partition|offset|timestamp              |
+----+--------

24/03/04 03:06:16 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:17 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:17 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:17 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+---------+
|key |value                                                                                                                                                                                                                                                                                              

24/03/04 03:06:18 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:18 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:18 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:18 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 27
-------------------------------------------
-------------------------------------------
Batch: 34
-------------------------------------------
-------------------------------------------
Batch: 35
-------------------------------------------
-------------------------------------------
Batch: 11
-------------------------------------------
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                  |topic      |partition|offset|timestamp              |
+----+------------------

24/03/04 03:06:18 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:18 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
24/03/04 03:06:18 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 34
-------------------------------------------
-------------------------------------------
Batch: 37
-------------------------------------------
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+
|key |value                                                                                                                                                                                                                                  |topic      |partition|offset|timestamp              |
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

24/03/04 03:06:19 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894


-------------------------------------------
Batch: 12
-------------------------------------------
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------+-----------------------+----------------------------------------------------------------------------------------------+
|key |value                                                                                                                                                                                                                                  |topic      |partition|offset|timestamp              |cleaned_title                                                                                 |
+----+--------------------------------------------------------------------------------------------------------------------------

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

24/03/04 03:23:05 WARN KafkaOffsetReaderAdmin: Error in attempt 1 getting Kafka offsets: 
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=metadata, deadlineMs=1709550674854, tries=1, nextAllowedTryMs=1709551385871) timed out at 1709551385771 after 1 attempt(s)
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.listOffsets(KafkaOffsetReaderAdmin.scala:96)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.$anonfun$fetchLatestOffsets$1(KafkaOffsetReaderAdmin.scala:325)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.$anonfun$partitionsAssignedToConsumer$2