In [1]:
!pip install fastavro confluent-kafka

Collecting fastavro
  Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting confluent-kafka
  Downloading confluent_kafka-2.9.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (22 kB)
Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m22.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading confluent_kafka-2.9.0-cp311-cp311-manylinux_2_28_x86_64.whl (3.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.8/3.8 MB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fastavro, confluent-kafka
Successfully installed confluent-kafka-2.9.0 fastavro-1.10.0


# Spark Setup

Reference: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

In [2]:
import os
import subprocess

# Fetch the latest Spark 3.x.x version
# curl -s https://downloads.apache.org/spark/ → Fetches the Spark download page.
# grep -o 'spark-3\.[0-9]\+\.[0-9]\+' → Extracts only versions that start with spark-3. (ignoring Spark 4.x if it exists in the future).
# sort -V → Sorts the versions numerically.
# tail -1 → Selects the latest version.
spark_version = subprocess.run(
    "curl -s https://downloads.apache.org/spark/ | grep -o 'spark-3\\.[0-9]\\+\\.[0-9]\\+' | sort -V | tail -1",
    shell=True, capture_output=True, text=True
).stdout.strip()

spark_version

'spark-3.5.5'

In [3]:
spark_release=spark_version
hadoop_version='hadoop3'

import os, time
start=time.time()
os.environ['SPARK_RELEASE']=spark_release
os.environ['HADOOP_VERSION']=hadoop_version
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_release}-bin-{hadoop_version}"

In [4]:
# Run below commands in google colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # install Java8
!wget -q http://apache.osuosl.org/spark/${SPARK_RELEASE}/${SPARK_RELEASE}-bin-${HADOOP_VERSION}.tgz # download spark-3.3.X
!tar xf ${SPARK_RELEASE}-bin-${HADOOP_VERSION}.tgz # unzip it

!pip install -q findspark # install findspark
# findspark find your Spark Distribution and sets necessary environment variables

import findspark
findspark.init()

# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.5.5


# Define the configuration details for your Spark job:


Create your Spark session. You must define details of the Kafka Cluster to connect to, topic name and consumer group name.

- kafka_brokers: List of Kafka bootstrap servers  
- topic_name: The Kafka topic to read messages from
- consumer_group: This allows you to use different Spark jobs to consume the same topic messages and implement different analytics
- schema: the AVRO schema of topic messages

In [39]:
# Define Azure credentials
event_hub_namespace='iesstsabbadbaa-grp-01-05'

rides_eventhub_name='grp04-ride-events'
rides_consumer_eventhub_connection_str='Endpoint=sb://iesstsabbadbaa-grp-01-05.servicebus.windows.net/;SharedAccessKeyName=Consumer;SharedAccessKey=iNowxPjC+fG9CrLnklmDTAy/J1n0e9Wpe+AEhC107ys=;EntityPath=grp04-ride-events'

specials_eventhub_name='grp04-special-events'
specials_consumer_eventhub_connection_str='Endpoint=sb://iesstsabbadbaa-grp-01-05.servicebus.windows.net/;SharedAccessKeyName=Consumer;SharedAccessKey=tJYUHSWabtnBVNOhc5TgJMHz1vtPw1NqC+AEhH6h8V4=;EntityPath=grp04-special-events'

In [45]:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro

# Define the schema (from github)
with open("ride_datafeed_schema.json") as f:
    schema = f.read()

with open("special_events_schema.json") as e:
    special_schema = e.read()

# Create a Spark session
spark = SparkSession \
    .builder \
    .appName("StreamingAVROFromKafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-avro_2.12:3.5.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

In [46]:
kafkaConf_rides = {
    "kafka.bootstrap.servers": f"{event_hub_namespace}.servicebus.windows.net:9093",
    # Below settins required if kafka is secured, for example when connecting to Azure Event Hubs:
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{rides_consumer_eventhub_connection_str}";',

    "subscribe": rides_eventhub_name,
    "startingOffsets": "earliest", # "latest", "earliest", (by choosing earliest, you will consume all the data on the event hub immediately)
        # by choosing "latest", you will consume only newly arriving data.



    "enable.auto.commit": "true ",
    "groupIdPrefix": "Stream_Analytics_",
    "auto.commit.interval.ms": "5000"
}

kafkaConf_specials = {
    "kafka.bootstrap.servers": f"{event_hub_namespace}.servicebus.windows.net:9093",
    # Below settins required if kafka is secured, for example when connecting to Azure Event Hubs:
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{specials_consumer_eventhub_connection_str}";',

    "subscribe": specials_eventhub_name,
    "startingOffsets": "earliest", # "latest", "earliest", (by choosing earliest, you will consume all the data on the event hub immediately)
        # by choosing "latest", you will consume only newly arriving data.



    "enable.auto.commit": "true ",
    "groupIdPrefix": "Stream_Analytics_",
    "auto.commit.interval.ms": "5000"
}


In [47]:
# Read from Event Hub using Kafka
df_rides = spark \
    .readStream \
    .format("kafka") \
    .options(**kafkaConf_rides) \
    .load()

# Deserialize the AVRO messages from the value column
df_rides = df_rides.select(from_avro(df_rides.value, schema).alias("ride_events"))

# Read from Event Hub using Kafka
df_specials = spark \
    .readStream \
    .format("kafka") \
    .options(**kafkaConf_specials) \
    .load()

# Deserialize the AVRO messages from the value column
df_specials = df_specials.select(from_avro(df_specials.value, special_schema).alias("special_event"))

In [48]:
# Flatten the schemas
from pyspark.sql.functions import col

df_rides = df_rides.select(
    col("ride_events.event_id"),
    col("ride_events.ride_id"),
    col("ride_events.event_type"),
    col("ride_events.timestamp"),
    col("ride_events.user_id"),
    col("ride_events.driver_id"),

    col("ride_events.pickup_location.latitude").alias("pickup_latitude"),
    col("ride_events.pickup_location.longitude").alias("pickup_longitude"),
    col("ride_events.pickup_location.address").alias("pickup_address"),
    col("ride_events.pickup_location.city").alias("pickup_city"),

    col("ride_events.dropoff_location.latitude").alias("dropoff_latitude"),
    col("ride_events.dropoff_location.longitude").alias("dropoff_longitude"),
    col("ride_events.dropoff_location.address").alias("dropoff_address"),
    col("ride_events.dropoff_location.city").alias("dropoff_city"),

    col("ride_events.ride_details.distance_km"),
    col("ride_events.ride_details.estimated_duration_minutes"),
    col("ride_events.ride_details.actual_duration_minutes"),
    col("ride_events.ride_details.vehicle_type"),
    col("ride_events.ride_details.base_fare"),
    col("ride_events.ride_details.surge_multiplier"),
    col("ride_events.ride_details.total_fare"),

    col("ride_events.payment_info.payment_method"),
    col("ride_events.payment_info.payment_status"),
    col("ride_events.payment_info.payment_id"),

    col("ride_events.ratings.user_to_driver_rating"),
    col("ride_events.ratings.driver_to_user_rating"),
    col("ride_events.ratings.user_comment"),
    col("ride_events.ratings.driver_comment"),

    col("ride_events.cancellation_info.canceled_by"),
    col("ride_events.cancellation_info.cancellation_reason"),
    col("ride_events.cancellation_info.cancellation_fee"),

    col("ride_events.traffic_conditions.traffic_level"),
    col("ride_events.traffic_conditions.estimated_delay_minutes"),

    col("ride_events.driver_location.latitude").alias("driver_latitude"),
    col("ride_events.driver_location.longitude").alias("driver_longitude"),
    col("ride_events.driver_location.heading").alias("driver_heading"),
    col("ride_events.driver_location.speed_kmh").alias("driver_speed_kmh"),

    col("ride_events.app_version"),
    col("ride_events.platform"),
    col("ride_events.session_id")
)

df_specials = df_specials.select(
    col("special_event.type").alias("event_type"),
    col("special_event.name").alias("event_name"),
    col("special_event.venue_zone").alias("venue_zone"),

    col("special_event.venue_location.latitude").alias("venue_latitude"),
    col("special_event.venue_location.longitude").alias("venue_longitude"),
    col("special_event.venue_location.address").alias("venue_address"),
    col("special_event.venue_location.city").alias("venue_city"),

    col("special_event.event_start").alias("event_start"),
    col("special_event.event_end").alias("event_end"),
    col("special_event.arrivals_start").alias("arrivals_start"),
    col("special_event.arrivals_end").alias("arrivals_end"),
    col("special_event.departures_start").alias("departures_start"),
    col("special_event.departures_end").alias("departures_end"),

    col("special_event.arrival_rides").alias("arrival_rides"),
    col("special_event.departure_rides").alias("departure_rides"),
    col("special_event.estimated_attendees").alias("estimated_attendees")
)



# Analytical Queries

Your Spark job and input messages are ready to be worked on. Now, you can apply any transformations required to answer business questions.

IMPORTANT NOTE: if in config you chose "startingOffsets": "latest", then you must send data AFTER running df.writeStream...
In other words, Spark will only start 'consuming' events after you run .writeStream, meaning that it will show up as empty if no new events have been sent after running .writeStream. (For this to be a real-time analytics case, it should be set to latest, so our stats update as new data comes in. For testing purposes, easier to set it to 'earliest' cause then you just send once and can work with that).

## Rides Query 1

In [18]:
!mkdir checkpoint

In [49]:
# If offset:Latest, send new events after running this cell.
query_name='all_rides'
query=df_rides.writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName(query_name) \
    .start()

    # If you run this locally and change .format, you can enable checkpointing. THis will allow you to resume in case spark crashes.
    # .option("checkpointLocation", "checkpoint") \  # Checkpoint not valid for memory
    #.option("path", "/checkpoint/") \

spark.sql('show tables').show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |all_rides|       true|
+---------+---------+-----------+



In [50]:
# Status either "Processing new data" or "Getting offsets from..."
query.status

{'message': 'Getting offsets from KafkaV2[Subscribe[grp04-ride-events]]',
 'isDataAvailable': False,
 'isTriggerActive': True}

In [51]:
print(spark.sql(f'SELECT count(*) as record_count FROM {query_name}').show(20, truncate=True))

+------------+
|record_count|
+------------+
|        1542|
+------------+

None


In [52]:
print(spark.sql(f'SELECT * FROM {query_name}').show(20, truncate=True))

+--------------+------------+-----------------+--------------------+-------+---------+------------------+-------------------+--------------------+-----------+------------------+-------------------+--------------------+------------+-----------+--------------------------+-----------------------+------------+---------+----------------+----------+--------------+--------------+------------+---------------------+---------------------+--------------------+--------------------+-----------+-------------------+----------------+-------------+-----------------------+---------------+----------------+--------------+----------------+-----------+--------+----------+
|      event_id|     ride_id|       event_type|           timestamp|user_id|driver_id|   pickup_latitude|   pickup_longitude|      pickup_address|pickup_city|  dropoff_latitude|  dropoff_longitude|     dropoff_address|dropoff_city|distance_km|estimated_duration_minutes|actual_duration_minutes|vehicle_type|base_fare|surge_multiplier|total_f

## Rides Query 2

In [53]:
# Filter the records by event_type
requested_rides_df = df_rides.filter((df_rides.event_type == "RIDE_REQUESTED"))


# Display the filtered records in the console
query_name='requested_rides'
query=requested_rides_df.writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName(query_name) \
    .start()

    # If you run this locally and change .format, you can enable checkpointing. THis will allow you to resume in case spark crashes.
    # .option("checkpointLocation", "checkpoint") \  # Checkpoint not valid for memory
    #.option("path", "/checkpoint/") \

spark.sql('show tables').show()

+---------+---------------+-----------+
|namespace|      tableName|isTemporary|
+---------+---------------+-----------+
|         |      all_rides|       true|
|         |requested_rides|       true|
+---------+---------------+-----------+



In [55]:
query.status

{'message': 'Getting offsets from KafkaV2[Subscribe[grp04-ride-events]]',
 'isDataAvailable': False,
 'isTriggerActive': True}

In [56]:
print(spark.sql(f'SELECT * FROM {query_name}').show(20, truncate=True))

+--------------+------------+--------------+--------------------+-------+---------+------------------+-------------------+--------------------+-----------+------------------+-------------------+--------------------+------------+-----------+--------------------------+-----------------------+------------+---------+----------------+----------+--------------+--------------+----------+---------------------+---------------------+------------+--------------+-----------+-------------------+----------------+-------------+-----------------------+---------------+----------------+--------------+----------------+-----------+--------+----------+
|      event_id|     ride_id|    event_type|           timestamp|user_id|driver_id|   pickup_latitude|   pickup_longitude|      pickup_address|pickup_city|  dropoff_latitude|  dropoff_longitude|     dropoff_address|dropoff_city|distance_km|estimated_duration_minutes|actual_duration_minutes|vehicle_type|base_fare|surge_multiplier|total_fare|payment_method|pay

## Specials Query 1

In [57]:
# If offset:Latest, send new events after running this cell.
query_name='all_specials'
query=df_specials.writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName(query_name) \
    .start()

    # If you run this locally and change .format, you can enable checkpointing. THis will allow you to resume in case spark crashes.
    # .option("checkpointLocation", "checkpoint") \  # Checkpoint not valid for memory
    #.option("path", "/checkpoint/") \

spark.sql('show tables').show()

+---------+---------------+-----------+
|namespace|      tableName|isTemporary|
+---------+---------------+-----------+
|         |      all_rides|       true|
|         |   all_specials|       true|
|         |requested_rides|       true|
+---------+---------------+-----------+



In [58]:
print(spark.sql(f'SELECT * FROM {query_name}').show(20, truncate=True))

+----------+--------------------+----------+--------------+---------------+------------------+----------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------+---------------+-------------------+
|event_type|          event_name|venue_zone|venue_latitude|venue_longitude|     venue_address|venue_city|        event_start|          event_end|     arrivals_start|       arrivals_end|   departures_start|     departures_end|arrival_rides|departure_rides|estimated_attendees|
+----------+--------------------+----------+--------------+---------------+------------------+----------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------+---------------+-------------------+
|   concert|     Concert Event 1|  malasana|       40.4265|        -3.7025|495 Malasana Calle|    Madrid|2025-01-01 19:00:00|2025-01-01 22:00:00|2025-01-01 16:00:00|2025-01

# Kafka topic to Parquet files (untested by me)

In [None]:
!mkdir output

In [None]:
query_name='parquet'
query_parquet = df.writeStream \
        .format("parquet") \
        .option("checkpointLocation","checkpoint2") \
        .option("path", "output") \
        .queryName(query_name) \
        .trigger(processingTime='20 seconds') \
        .start()

In [None]:
# Wait a few seconds for parquet files to build up
!sleep 20
!ls -lrt output |head -20

total 20
-rw-r--r-- 1 root root 3673 Apr  4 06:09 part-00001-b5877284-6f84-4bbf-946f-ecb7d64b55f2-c000.snappy.parquet
-rw-r--r-- 1 root root 3520 Apr  4 06:09 part-00000-fa2a485a-e63e-4524-8c5e-763ae433bd9f-c000.snappy.parquet
-rw-r--r-- 1 root root 3590 Apr  4 06:09 part-00002-eebac414-979f-475f-a721-f160b8fb19e3-c000.snappy.parquet
-rw-r--r-- 1 root root 3499 Apr  4 06:09 part-00003-0823ed66-a984-4a57-b6c8-c711493a1dd7-c000.snappy.parquet
drwxr-xr-x 2 root root 4096 Apr  4 06:09 _spark_metadata
-rw-r--r-- 1 root root    0 Apr  4 06:09 part-00000-a219f6b4-632b-4a6a-bf57-d084ea44a4e9-c000.snappy.parquet


# Stop your queries and your spark job

In [59]:
# Set to True and run cell when you want to stop your queries and Spark job.
if True:
  # Get the list of active streaming queries
  active_queries = spark.streams.active

# Print details about each active query
  for query in active_queries:
      query.stop()
      print(f"Query Name: {query.name}")
      print(f"Query ID: {query.id}")
      print(f"Query Status: {query.status}")
      print(f"Is Query Active: {query.isActive}")
      print("-" * 50)
  spark.stop()
  spark.sparkContext.stop()

Query Name: all_rides
Query ID: 4472b2ca-14cd-44ed-95da-aaa01add810f
Query Status: {'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}
Is Query Active: False
--------------------------------------------------
Query Name: requested_rides
Query ID: b9b1c64f-5561-48aa-b458-4e6dade81ff5
Query Status: {'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}
Is Query Active: False
--------------------------------------------------
Query Name: all_specials
Query ID: a77c6a13-a25b-4d13-a2d6-ef791906a1ec
Query Status: {'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}
Is Query Active: False
--------------------------------------------------
