In [1]:
import nest_asyncio
import asyncio
import json
from aiokafka import AIOKafkaConsumer
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, StructField

In [2]:
#import necessary libraries
nest_asyncio.apply()

In [3]:
# start spark session
spark = SparkSession.builder \
    .appName("KafkaToSpark") \
    .getOrCreate()

In [4]:
# kafka connfiguration
BOOTSTRAP_SERVERS = "localhost:9092"
KAFKA_TOPIC       = "filtered_stream"

In [5]:
# create spark schema for incoming posts
schema = StructType() \
    .add("text", StringType())\
    .add("timestamp", StringType())
     
    

In [6]:
# create empty spark spark dataframe
df_posts = spark.createDataFrame([], schema)

In [8]:
import asyncio
from IPython import display

async def consume_messages_batch(num_iterations=5, batch_limit=10, delay_seconds=5):
    global df_posts

    consumer = AIOKafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=BOOTSTRAP_SERVERS,
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )

    await consumer.start()
    try:
        for iteration in range(num_iterations):
            display.clear_output(wait=True)
            print(f"\n🔄 Iteration {iteration + 1} / {num_iterations}")

            batch_rows = []
            count = 0

            async for message in consumer:
                post = message.value
                batch_rows.append((post.get("text", ""), post.get("timestamp", "")))
                count += 1

                if count >= batch_limit:
                    break

            if batch_rows:
                new_batch_df = spark.createDataFrame(batch_rows, schema=schema)
                df_posts = df_posts.union(new_batch_df)
                df_posts.show(truncate=False)
            else:
                print("⚠️ No new messages received.")

            # Pause before next batch
            await asyncio.sleep(delay_seconds)

    finally:
        await consumer.stop()

await consume_messages_batch(num_iterations=5, batch_limit=10, delay_seconds=5)


🔄 Iteration 5 / 5
+-----------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|text                                                                                                                                     |timestamp                   |
+-----------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|Aaaaand the nausea is back.                                                                                                              |Mon Apr 06 22:37:46 PDT 2009|
|My son Vincas is sick, so I stay at home  Just three tense days at work, and I am back on holiday with kids                              |Mon Apr 06 22:38:14 PDT 2009|
|sooo sick of the snow  ughh                                                                                                            

### Passing the data through a pipeline

In [11]:
import pyspark.sql.functions as f
df1 = df_posts.withColumn( "datetime_clean",  
f.regexp_replace("timestamp", r"^[A-Za-z]{3} ", "")  # Remove the day of the week 
)

In [12]:
df2 = df1.withColumn( "datetime_column2",  
f.regexp_replace("datetime_clean", r"\s[A-Za-z]+$", "")  # Remove the timezone (e.g., "GMT") 
) 

In [13]:
df3 = df2.withColumn("datetime_final", f.to_timestamp(df2.datetime_column2, "MMM dd HH:mm:ss z yyyy"))

In [14]:
df3.show()

+--------------------+--------------------+--------------------+--------------------+-------------------+
|                text|           timestamp|      datetime_clean|    datetime_column2|     datetime_final|
+--------------------+--------------------+--------------------+--------------------+-------------------+
|Aaaaand the nause...|Mon Apr 06 22:37:...|Apr 06 22:37:46 P...|Apr 06 22:37:46 P...|2009-04-07 05:37:46|
|My son Vincas is ...|Mon Apr 06 22:38:...|Apr 06 22:38:14 P...|Apr 06 22:38:14 P...|2009-04-07 05:38:14|
|sooo sick of the ...|Mon Apr 06 22:38:...|Apr 06 22:38:20 P...|Apr 06 22:38:20 P...|2009-04-07 05:38:20|
|Poor Joshy is sic...|Mon Apr 06 22:41:...|Apr 06 22:41:25 P...|Apr 06 22:41:25 P...|2009-04-07 05:41:25|
|still sick. feeli...|Mon Apr 06 22:43:...|Apr 06 22:43:06 P...|Apr 06 22:43:06 P...|2009-04-07 05:43:06|
|@bananaface IM SO...|Mon Apr 06 22:46:...|Apr 06 22:46:36 P...|Apr 06 22:46:36 P...|2009-04-07 05:46:36|
|Uh oh... I think ...|Mon Apr 06 22:50:...|Apr

In [15]:
df3.printSchema()

root
 |-- text: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- datetime_clean: string (nullable = true)
 |-- datetime_column2: string (nullable = true)
 |-- datetime_final: timestamp (nullable = true)



In [18]:
df4 = df3.drop("timestamp", "datetime_clean", "datetime_column2")

In [16]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, VectorAssembler
from pyspark.ml import Pipeline

regex_token = RegexTokenizer(
    inputCol="text",
    outputCol="content_filtered",
    pattern="\\W+",
    toLowercase=True
)

default_stopwords = StopWordsRemover.loadDefaultStopWords('english')
additional_stopwords = ['rt', 'via', 'amp', 'https', 'http', 'co', 'bsky', 'u', 'app', 's', 'www', 'com', 'de']
all_stopwords = list(set(default_stopwords + additional_stopwords))

stop_words_remover = StopWordsRemover(
    inputCol="content_filtered",
    outputCol="filtered_words",
    stopWords=all_stopwords
)

count_vectorizer = CountVectorizer(
    inputCol="filtered_words",
    outputCol="features"
)

vector_assembler = VectorAssembler(
    inputCols=["features"],
    outputCol="final_features"
)

full_pipeline = Pipeline(stages=[
    regex_token,
    stop_words_remover,
    count_vectorizer,
    vector_assembler
])

In [19]:
# Fit the pipeline
pipeline_model = full_pipeline.fit(df4)

In [20]:
# Show the data
df_ready = pipeline_model.transform(df3)

## LDA Model

In [None]:
from pyspark.ml.clustering import LDA, KMeans

In [None]:
# --- Fit LDA ---
lda = LDA(k=5, maxIter=10, featuresCol="features")
lda_model = lda.fit(df_ready)

# Save LDA model
lda_model.write().overwrite().save("models/lda_model")

### K Means

In [None]:
# --- Fit KMeans ---
kmeans = KMeans(k=5, seed=42, featuresCol="features")
kmeans_model = kmeans.fit(df_ready)

# Save KMeans model
kmeans_model.write().overwrite().save("models/kmeans_model")