In [15]:
# %cp -r /data/* .

In [None]:
# %cp striming.ipnb /data/

In [17]:
from pyspark.sql import SparkSession

spark = (
        SparkSession.builder.appName("ElectionAnalysis")
        .master("local[*]")
        # .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0–10_2.12:3.3.0")  
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0")
        .config("spark.sql.adaptive.enabled", "false")  # Disable adaptive query execution
        .getOrCreate()
    )
spark

In [18]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.functions import sum as _sum
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    TimestampType,
)

# Read data from Kafka 'votes_topic' and process it
votes_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:29092")
    .option("subscribe", "votes_topic")
    .option("startingOffsets", "earliest")
    .load()
)
# votes_df.show()
votes_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [19]:
from pyspark.sql.functions import expr

votes_df = votes_df.withColumn("value", expr("CAST(value as string)"))
# votes_df.show()

In [20]:
from pyspark.sql.functions import from_json

vote_schema = StructType(
        [
            StructField("voter_id", StringType(), True),
            StructField("candidate_id", StringType(), True),
            StructField("voting_time", TimestampType(), True),
            StructField("voter_name", StringType(), True),
            StructField("party_affiliation", StringType(), True),
            StructField("biography", StringType(), True),
            StructField("campaign_platform", StringType(), True),
            StructField("photo_url", StringType(), True),
            StructField("candidate_name", StringType(), True),
            StructField("date_of_birth", StringType(), True),
            StructField("gender", StringType(), True),
            StructField("nationality", StringType(), True),
            StructField("registration_number", StringType(), True),
            StructField(
                "address",
                StructType(
                    [
                        StructField("street", StringType(), True),
                        StructField("city", StringType(), True),
                        StructField("state", StringType(), True),
                        StructField("country", StringType(), True),
                        StructField("postcode", StringType(), True),
                    ]
                ),
                True,
            ),
            StructField("email", StringType(), True),
            StructField("phone_number", StringType(), True),
            StructField("cell_number", StringType(), True),
            StructField("picture", StringType(), True),
            StructField("registered_age", IntegerType(), True),
            StructField("vote", IntegerType(), True),
        ]
    )
votes_df = votes_df.withColumn("value_json", from_json(col("value"), vote_schema)).selectExpr("value_json.*")
votes_df.printSchema()

root
 |-- voter_id: string (nullable = true)
 |-- candidate_id: string (nullable = true)
 |-- voting_time: timestamp (nullable = true)
 |-- voter_name: string (nullable = true)
 |-- party_affiliation: string (nullable = true)
 |-- biography: string (nullable = true)
 |-- campaign_platform: string (nullable = true)
 |-- photo_url: string (nullable = true)
 |-- candidate_name: string (nullable = true)
 |-- date_of_birth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- registration_number: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- postcode: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- cell_number: string (nullable = true)
 |-- picture: string (nullable = true)
 |-

In [21]:
# Data preprocessing: type casting and watermarking
votes_df = votes_df.withColumn("voting_time", col("voting_time").cast(TimestampType())) \
            .withColumn("vote", col("vote").cast(IntegerType()))

enriched_votes_df = votes_df.withWatermark("voting_time", "1 minute")
                               

In [22]:
# Aggregate votes per candidate and turnout by location
votes_per_candidate = enriched_votes_df.groupBy("candidate_id",
                                                "candidate_name",
                                                "party_affiliation",
                                                "photo_url") \
                                                .agg(_sum("vote").alias("total_votes"))


turnout_by_location = (enriched_votes_df.groupBy("address.state")
                       .count().alias("total_votes"))

# turnout_by_location.show()
votes_per_candidate.printSchema()

root
 |-- candidate_id: string (nullable = true)
 |-- candidate_name: string (nullable = true)
 |-- party_affiliation: string (nullable = true)
 |-- photo_url: string (nullable = true)
 |-- total_votes: long (nullable = true)



In [23]:
# Write aggregated data to Kafka topics ('aggregated_votes_per_candidate', 'aggregated_turnout_by_location')

votes_per_candidate_to_kafka = (
    votes_per_candidate.selectExpr("to_json(struct(*)) AS value")
    .writeStream.format("kafka")
    .option("kafka.bootstrap.servers", "broker:29092")
    .option("topic", "aggregated_votes_per_candidate")
    .option("checkpointLocation","/home/jovyan/checkpoint2666/checkpoint1",)
    .outputMode("update")
    .start()
)

turnout_by_location_to_kafka = (
        turnout_by_location.selectExpr("to_json(struct(*)) AS value")
        .writeStream.format("kafka")
        .option("kafka.bootstrap.servers", "broker:29092")
        .option("topic", "aggregated_turnout_by_location")
        .option("checkpointLocation","/home/jovyan/checkpoint2666/checkpoint2")
        .outputMode("update")
        .start()
    )


In [None]:
 # Await termination for the streaming queries
votes_per_candidate_to_kafka.awaitTermination()
turnout_by_location_to_kafka.awaitTermination()
