#### Connect to kafka broker and create Consumer, only used initialy to test if consuming from kafka topic works 

In [1]:
# # first set up the and run the kafka server also install kafka-python. The commands can be found in 'kakfa commands.txt'

# from kafka import KafkaAdminClient
# from kafka import KafkaConsumer, TopicPartition
# import json

# topic_name = 'vehicle_positions'
# consumer = KafkaConsumer(#topic = topic_name,
#                          bootstrap_servers=['localhost:9092'],
#                          auto_offset_reset='earliest', #will start consuming from the first message in the topic 
#                          value_deserializer = lambda x: json.loads(x.decode("utf-8")),
#                           consumer_timeout_ms=10000 #stop consumer from waiting for messages after 10000ms 
#                          )

# tp1 = TopicPartition(topic_name, 0)
# consumer.assign([tp1])

#### Start Consumer, only used initialy to test if consuming from kafka topic works 

In [2]:
# for message in consumer: #consumer will bring all events from the start end then wait for the next event to happend untill it time outs after 10000ms 
#     # print(f"Received Value: {message.value}, Key:{message.key}, offset: {message.offset}, partition:{message.partition}")
#     print(message.value)

## Injest Data with spark from kafka and send data to mongo

In [None]:
from pymongo import MongoClient

# connect to mongo and clear existing collection
client = MongoClient('mongodb://localhost:27017/')
db = client['your_database']
kafka_aggregated_data = db['kafka_aggregated_data'] #prosessed agregated data from kafka
kafka_raw_data = db['kafka_raw_data'] #unprosessed raw data from kafka broker
kafka_aggregated_data.drop() #delete if exists 
kafka_raw_data.drop() #delete if exists 



from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json, col, to_timestamp, count, avg, desc, window, concat, lit, date_format


def process_aggregated(df, batch_id):
    print(f"Batch aggregated {batch_id} written to MongoDB. Records: {df.count()}")
    df.show()
    
    # Convert timestamp to string to avoid MongoDB timestamp issues
    mongo_df = df.withColumn("window_start", col("window_start").cast("string")) \
                .withColumn("window_end", col("window_end").cast("string"))

    # Write to MongoDB and replace the document if a new updated document comes 
    mongo_df.write \
        .format("mongodb") \
        .mode("append") \
        .option("operationType", "update") \
        .option("upsertDocument", "true") \
        .option("database", "your_database") \
        .option("collection", "kafka_aggregated_data") \
        .save()


def process_raw(df, batch_id):
    print(f"Batch raw {batch_id} written to MongoDB. Records: {df.count()}")
    df.show(truncate=False)

    # Write to MongoDB and replace the document if a new updated document comes 
    df.write \
        .format("mongodb") \
        .mode("append") \
        .option("operationType", "update") \
        .option("upsertDocument", "true") \
        .option("database", "your_database") \
        .option("collection", "kafka_raw_data") \
        .save()


topic_name = 'vehicle_positions'

# Create Spark Session with both Kafka and MongoDB packages
spark = SparkSession.builder \
    .appName("KafkaStreaming") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1," + 
            "org.mongodb.spark:mongo-spark-connector_2.12:10.3.0") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/") \
    .config("spark.mongodb.output.uri",  "mongodb://127.0.0.1:27017/") \
    .getOrCreate()

# Raw Data
schema = StructType([
    StructField("name", StringType(), True),
    StructField("origin", StringType(), True),
    StructField("destination", StringType(), True),
    StructField("time", StringType(), True),
    StructField("link", StringType(), True),
    StructField("position", DoubleType(), True),
    StructField("spacing", DoubleType(), True),
    StructField("speed", DoubleType(), True)
])

# Read from Kafka earliest or latest
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", topic_name) \
    .option("startingOffsets", "earliest") \
    .load()
    
raw_df = kafka_df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*") \
.withColumn("time", to_timestamp(col("time"), "dd/MM/yyyy HH:mm:ss"))

# Process the data with correct aggregation syntax
processed_df = raw_df \
    .withWatermark("time" , "30 seconds") \
    .groupBy(window("time", "10 seconds"),
             "link") \
    .agg(
        count("speed").alias("total_vehicles"),  # Using count on a specific column
        avg("speed").alias("average_speed")      # Using avg function directly
     ) \
    .select(
            concat(col("window.start"), lit("_"), col("window.end"), lit("_"), col("link")).alias("_id"),
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),  
            "link",
            "total_vehicles",
            "average_speed"
        ) 

# Write output to console for testing
query_prosessed = processed_df.writeStream \
    .foreachBatch(process_aggregated) \
    .queryName("processed_data") \
    .start()

raw_df_with_id = raw_df \
        .select(
            concat(col("name"), lit("_"), col("time")).alias("_id"),
            "name",
            "origin",
            "destination",
            date_format(col("time"), "yyyy-MM-dd HH:mm:ss").alias("time"),  # Convert timestamp to string
            "link",
            "position",
            "spacing",
            "speed"
        ) 
        
# # Write raw data to console
raw_query = raw_df_with_id.writeStream \
    .foreachBatch(process_raw) \
    .queryName("raw_data") \
    .start()
    

# Keep the application running until both streams terminate
# spark.streams.awaitAnyTermination()
query_prosessed.awaitTermination()
raw_query.awaitTermination()

# query2.awaitTermination()
spark.stop()

In [2]:
#Code notes

# # Write output to console for testing
# query_prosessed = processed_df.writeStream \
#     .foreachBatch(process_batch) \
#     .queryName("processed_data") \
#     .start()
# # Each batch will contain:
# # - New windows that were created
# # - Updated windows that changed due to late data (within watermark)
# # - Does NOT include unchanged windows

#     # .foreach(process_row) \
#     # .foreachBatch(process_batch) \

#     # .outputMode("update") \
#     # .format("console") \
#     # .option("truncate", "false") \
#     # .queryName("processed_data") \
#     # .trigger(processingTime='5 seconds') \
#     # .start()


# # # Write raw data to console
# raw_query = raw_df.writeStream \
#     .foreach(process_row) \
#     .queryName("processed_data") \
#     .start()
    
#     # .outputMode("update") \
#     # .format("console") \
#     # .queryName("raw_data") \
#     # .start()

# # Keep the application running until both streams terminate
# spark.streams.awaitAnyTermination()
# # # Keep the application running
# # query_1.awaitTermination()


# # Process the data with correct aggregation syntax
# processed_df = raw_df \
#     .withWatermark("time" , "30 seconds") \
#     .groupBy(window("time", "10 seconds"),
#              "link") \
#     .agg(
#         count("speed").alias("total_vehicles"),  # Using count on a specific column
#         avg("speed").alias("average_speed")      # Using avg function directly
#      ) \
#     .select(
#             col("window.start").alias("window_start"),
#             col("window.end").alias("window_end"),  
#             "link",
#             "total_vehicles",
#             "average_speed"
#         ) 
#         #  .orderBy(desc("window_start"), "link")        
#     # .orderBy(desc(col("time_window.start")), "link")  # Order by grouping columns


# .trigger(processingTime='5 seconds')
# This means:
# Spark checks for new data every 5 seconds
# If there is new data, it processes all accumulated data since the last trigger
# If no new data, it waits until next trigger
# The aggregations (groupBy, count, avg) are computed for ALL data received so far because you're using outputMode("complete")

# # Process every 5 seconds (your current setting)
# .trigger(processingTime='5 seconds')
# # Process as soon as new data arrives
# .trigger(availableNow=True)
# # Process once and stop
# .trigger(once=True)
# # Continuous processing (micro-batch as small as 1ms)
# .trigger(continuous='1 second')




# # Complete mode (your current setting)
# .outputMode("complete")
# # - Recalculates ALL aggregations every trigger
# # - Shows full result table each time
# # - Good for totals/averages over all time

# # Update mode
# .outputMode("update")
# # - Only outputs changed aggregations
# # - Good for dashboards/live updates

# # Append mode
# .outputMode("append")
# # - Only outputs new aggregations
# the only one that can be used without agregation
