## AVRO_EventHub_Spark_Processing

In [1]:
!pip install azure-eventhub fastavro faker


Collecting azure-eventhub
  Downloading azure_eventhub-5.15.0-py3-none-any.whl.metadata (73 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/73.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.1/73.1 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting fastavro
  Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting faker
  Downloading faker-37.1.0-py3-none-any.whl.metadata (15 kB)
Collecting azure-core>=1.27.0 (from azure-eventhub)
  Downloading azure_core-1.33.0-py3-none-any.whl.metadata (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.6/42.6 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
Downloading azure_eventhub-5.15.0-py3-none-any.whl (327 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m327.8/327.8 kB[0m [31m13.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fastavro-1.10.0-cp311-

In [2]:
import os, time, subprocess

spark_version = subprocess.run(
    "curl -s https://downloads.apache.org/spark/ | grep -o 'spark-3\\.[0-9]\\+\\.[0-9]\\+' | sort -V | tail -1",
    shell=True, capture_output=True, text=True
).stdout.strip()

os.environ['SPARK_RELEASE'] = spark_version
os.environ['HADOOP_VERSION'] = 'hadoop3'
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ['SPARK_HOME'] = f"/content/{spark_version}-bin-hadoop3"

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/${SPARK_RELEASE}/${SPARK_RELEASE}-bin-hadoop3.tgz
!tar xf ${SPARK_RELEASE}-bin-hadoop3.tgz
!pip install -q findspark


In [3]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import col, from_unixtime, to_timestamp, window, session_window
import pyspark.sql.functions as F

In [4]:
spark = SparkSession.builder \
    .appName("AVRO_Streaming_EventHub") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config("spark.sql.shuffle.partitions", 4) \
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
            "org.apache.spark:spark-avro_2.12:3.5.0,"
            "org.apache.hadoop:hadoop-azure:3.3.1,"
            "com.microsoft.azure:azure-storage:8.6.6") \
    .master("local[*]") \
    .getOrCreate()


In [12]:
# Azure Blob Storage Key
spark.conf.set(
    "fs.azure.account.key.iesstsabbadbaa.blob.core.windows.net",
    "ZT6z+TYSxF0Xdm0vOCRbIpWoBss2BxOU0EcP2UDceddHX7Kyi8gyJvjyWG5THNp2HOprCHmblb2f+AStp8mAGw=="
)

# Event Hub (Kafka API) Settings
eventhub_namespace = 'iesstsabbadbaa-grp-06-10'
ride_requests_topic = 'ride_request_9'
driver_status_topic = 'driver_status_9'
connection_string = "Endpoint=sb://iesstsabbadbaa-grp-06-10.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XimkJFhlc8Gvorlhh7PRqEZVj0gDkiuDk+AEhH8O9is="

kafka_common_conf = {
    "kafka.bootstrap.servers": f"{eventhub_namespace}.servicebus.windows.net:9093",
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connection_string}";',
    "startingOffsets": "latest",
    "enable.auto.commit": "true",
    "groupIdPrefix": "Stream_Analytics_",
    "auto.commit.interval.ms": "5000"
}

In [13]:
import json
from schemas import ride_request_schema, driver_status_schema

ride_request_schema_str = json.dumps(ride_request_schema)
driver_status_schema_str = json.dumps(driver_status_schema)


In [14]:
df_ride = spark.readStream.format("kafka").options(**kafka_common_conf).option("subscribe", ride_requests_topic).load()
df_ride = df_ride.select(from_avro(df_ride.value, ride_request_schema_str).alias("ride"))

df_ride = df_ride.selectExpr(
    "ride.request_id", "ride.user_id", "ride.timestamp",
    "ride.pickup_location.latitude as pickup_lat", "ride.pickup_location.longitude as pickup_lon",
    "ride.destination.latitude as dest_lat", "ride.destination.longitude as dest_lon",
    "ride.status", "ride.vehicle_type", "ride.estimated_fare",
    "ride.estimated_duration", "ride.estimated_distance", "ride.passenger_count"
)

In [15]:
# TEMP: Print parsed ride request data to console
query_test = df_ride.writeStream.format("console") \
    .outputMode("append") \
    .option("truncate", False) \
    .start()

import time
time.sleep(15)

query_test.stop()


In [16]:
df_driver = spark.readStream.format("kafka").options(**kafka_common_conf).option("subscribe", driver_status_topic).load()
df_driver = df_driver.select(from_avro(df_driver.value, driver_status_schema_str).alias("driver"))

df_driver = df_driver.selectExpr(
    "driver.driver_id", "driver.timestamp", "driver.current_location.latitude as latitude",
    "driver.current_location.longitude as longitude", "driver.status",
    "driver.vehicle_info.vehicle_id as vehicle_id", "driver.vehicle_info.vehicle_type as vehicle_type",
    "driver.vehicle_info.capacity as capacity", "driver.current_ride_id",
    "driver.last_update", "driver.battery_level"
)


In [None]:
# Start ride stream to Azure Blob
query_ride = df_ride.writeStream.format("parquet") \
    .option("path", "wasbs://streamed-data-project-group9@iesstsabbadbaa.blob.core.windows.net/output_ride_v2") \
    .option("checkpointLocation", "wasbs://streamed-data-project-group9@iesstsabbadbaa.blob.core.windows.net/checkpoints/output_ride_v2") \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .start()


# Start driver stream to Azure Blob
query_driver = df_driver.writeStream.format("parquet") \
    .option("path", "wasbs://streamed-data-project-group9@iesstsabbadbaa.blob.core.windows.net/output_driver") \
    .option("checkpointLocation", "wasbs://streamed-data-project-group9@iesstsabbadbaa.blob.core.windows.net/checkpoints/output_driver") \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .start()

# Let both run for a set duration
import time
time.sleep(500)

# Stop streaming queries gracefully
print("🛑 Stopping active queries...")
for query in spark.streams.active:
    query.stop()

# Shutdown Spark session
spark.stop()
print("✅ All done! Data should now be in Azure Blob Storage.")