In [1]:
!pip install fastavro confluent-kafka

Collecting fastavro
  Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting confluent-kafka
  Downloading confluent_kafka-2.10.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (22 kB)
Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m20.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading confluent_kafka-2.10.0-cp311-cp311-manylinux_2_28_x86_64.whl (3.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.8/3.8 MB[0m [31m84.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fastavro, confluent-kafka
Successfully installed confluent-kafka-2.10.0 fastavro-1.10.0


# Spark Setup

Reference: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

In [2]:
import os
import subprocess

# Fetch the latest Spark 3.x.x version
# curl -s https://downloads.apache.org/spark/ → Fetches the Spark download page.
# grep -o 'spark-3\.[0-9]\+\.[0-9]\+' → Extracts only versions that start with spark-3. (ignoring Spark 4.x if it exists in the future).

# sort -V → Sorts the versions numerically.
# tail -1 → Selects the latest version.
spark_version = subprocess.run(
    "curl -s https://downloads.apache.org/spark/ | grep -o 'spark-3\\.[0-9]\\+\\.[0-9]\\+' | sort -V | tail -1",
    shell=True, capture_output=True, text=True
).stdout.strip()

spark_version

'spark-3.5.5'

In [3]:
spark_release=spark_version
hadoop_version='hadoop3'

import os, time
start=time.time()
os.environ['SPARK_RELEASE']=spark_release
os.environ['HADOOP_VERSION']=hadoop_version
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_release}-bin-{hadoop_version}"

In [4]:
# Run below commands in google colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # install Java8
!wget -q http://apache.osuosl.org/spark/${SPARK_RELEASE}/${SPARK_RELEASE}-bin-${HADOOP_VERSION}.tgz # download spark-3.3.X
!tar xf ${SPARK_RELEASE}-bin-${HADOOP_VERSION}.tgz # unzip it

!pip install -q findspark # install findspark
# findspark find your Spark Distribution and sets necessary environment variables

import findspark
findspark.init()

# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.5.5


# Define the configuration details for your Spark job:


Create your Spark session. You must define details of the Kafka Cluster to connect to, topic name and consumer group name.

- kafka_brokers: List of Kafka bootstrap servers  
- topic_name: The Kafka topic to read messages from
- consumer_group: This allows you to use different Spark jobs to consume the same topic messages and implement different analytics
- schema: the AVRO schema of topic messages

In [5]:
!pip install python-dotenv
import os
from dotenv import load_dotenv

event_hub_namespace = os.environ.get("event_hub_namespace")

passengers_eventhub_name=os.environ.get("passengers_eventhub_name")
passengers_conn_str=os.environ.get("passengers_conn_str")

drivers_eventhub_name=os.environ.get("drivers_eventhub_name")
drivers_conn_str=os.environ.get("drivers_conn_str")

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro

# Define the schema (from github)
with open("passengerschemav2.json") as f:
    pass_schema = f.read()

with open("driver_schema.json") as e:
    drv_schema = e.read()

# Create a Spark session
spark = SparkSession \
    .builder \
    .appName("StreamingAVROFromKafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-avro_2.12:3.5.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

In [7]:
kafkaConf_pass = {
    "kafka.bootstrap.servers": f"{event_hub_namespace}.servicebus.windows.net:9093",
    # Below settins required if kafka is secured, for example when connecting to Azure Event Hubs:
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{passengers_conn_str}";',

    "subscribe": passengers_eventhub_name,
    "startingOffsets": "latest", # "latest", "earliest", (by choosing earliest, you will consume all the data on the event hub immediately)
        # by choosing "latest", you will consume only newly arriving data.



    "enable.auto.commit": "true ",
    "groupIdPrefix": "debug_specials_",
    "auto.commit.interval.ms": "5000"
}

kafkaConf_drv = {
    "kafka.bootstrap.servers": f"{event_hub_namespace}.servicebus.windows.net:9093",
    # Below settins required if kafka is secured, for example when connecting to Azure Event Hubs:
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{drivers_conn_str}";',

    "subscribe": drivers_eventhub_name,
    "startingOffsets": "latest", # "latest", "earliest", (by choosing earliest, you will consume all the data on the event hub immediately)
        # by choosing "latest", you will consume only newly arriving data.



    "enable.auto.commit": "true ",
    "groupIdPrefix": "debug_specials_",
    "auto.commit.interval.ms": "5000"
}

In [8]:
# Read from Event Hub using Kafka
df_rides = spark \
    .readStream \
    .format("kafka") \
    .options(**kafkaConf_pass) \
    .load()

# Deserialize the AVRO messages from the value column
df_passenger = df_rides.select(from_avro(df_rides.value, pass_schema, {"mode": "PERMISSIVE"}).alias("passenger_events"))

# Read from Event Hub using Kafka
df_driver = spark \
    .readStream \
    .format("kafka") \
    .options(**kafkaConf_drv) \
    .load()

# Deserialize the AVRO messages from the value column
df_driver = df_driver.select(from_avro(df_driver.value, drv_schema, {"mode": "PERMISSIVE"}).alias("driver_event"))

In [9]:
# Flatten the schemas
from pyspark.sql.functions import col

df_passenger = df_passenger.select(
    col("passenger_events.request_id"),
    col("passenger_events.passenger_id"),
    col("passenger_events.timestamp"),
    col("passenger_events.pickup_location.latitude").alias("pickup_latitude"),
    col("passenger_events.pickup_location.longitude").alias("pickup_longitude"),
    col("passenger_events.dropoff_location.latitude").alias("dropoff_latitude"),
    col("passenger_events.dropoff_location.longitude").alias("dropoff_longitude"),
    col("passenger_events.vehicle_type"),
    col("passenger_events.passenger_preferences.music").alias("music_preference"),
    col("passenger_events.passenger_preferences.temperature").alias("preferred_temperature"),
    col("passenger_events.passenger_preferences.quiet_ride").alias("quiet_ride"),
    col("passenger_events.payment_info.payment_method").alias("payment_method"),
    col("passenger_events.payment_info.coupon_codes").alias("coupon_codes"),
    col("passenger_events.payment_info.loyalty_points_used").alias("loyalty_points_used"),
    col("passenger_events.estimated_fare"),
    col("passenger_events.text_messages"),
    col("passenger_events.driver_rating"),
    col("passenger_events.status"),
    col("passenger_events.driver_id"),
    col("passenger_events.request_timestamp"),
    col("passenger_events.accepted_timestamp"),
    col("passenger_events.ride_duration")
)

df_driver = df_driver.select(
      col("driver_event.driver_id"),
      col("driver_event.timestamp"),
      col("driver_event.latitude"),
      col("driver_event.longitude"),
      col("driver_event.status")
    )

# TESTING NEW QUERIES WITH BLOB

In [10]:
!pip install azure-storage-blob

Collecting azure-storage-blob
  Downloading azure_storage_blob-12.25.1-py3-none-any.whl.metadata (26 kB)
Collecting azure-core>=1.30.0 (from azure-storage-blob)
  Downloading azure_core-1.33.0-py3-none-any.whl.metadata (42 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/42.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.6/42.6 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Collecting isodate>=0.6.1 (from azure-storage-blob)
  Downloading isodate-0.7.2-py3-none-any.whl.metadata (11 kB)
Downloading azure_storage_blob-12.25.1-py3-none-any.whl (406 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m407.0/407.0 kB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading azure_core-1.33.0-py3-none-any.whl (207 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m207.1/207.1 kB[0m [31m16.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading isodate-0.7.2-py3-none-any.whl (22 kB)

In [11]:
# Blob
account_name = os.environ.get("account_name")
container_name = os.environ.get("container_name")
account_key = os.environ.get("account_key")

In [12]:
import time
from azure.storage.blob import BlobServiceClient

def process_batch(batch_df, batch_id, query_name):
    """Process each micro-batch and upload to Azure Blob"""
    print(f"Starting to process batch {batch_id} for {query_name}")

    try:
        # Check if DataFrame is empty without calling isEmpty()
        # This avoids the job cancellation issue
        count = batch_df.count()
        if count == 0:
            print(f"Batch {batch_id} for {query_name} is empty, skipping")
            return

        # Create a unique directory for this batch
        timestamp = int(time.time())
        local_output_path = f"/tmp/stream-output/{query_name}/{timestamp}_{batch_id}"

        # Explicitly create the directory
        import os
        os.makedirs(os.path.dirname(local_output_path), exist_ok=True)

        # First write batch to local storage as Parquet with minimized partitions
        # Use a try-except to handle potential job cancellations
        try:
            batch_df.coalesce(1).write.mode("overwrite").parquet(local_output_path)
        except Exception as write_error:
            print(f"Error writing batch {batch_id} to Parquet: {str(write_error)}")
            return

        # Set up Azure Blob client
        conn_str = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
        blob_service_client = BlobServiceClient.from_connection_string(conn_str)
        container_client = blob_service_client.get_container_client(container_name)

        # Upload each Parquet file to Azure Blob
        files_uploaded = 0
        for root, dirs, files in os.walk(local_output_path):
            for file in files:
                if file.endswith(".parquet"):
                    local_file_path = os.path.join(root, file)
                    blob_path = f"stream-output/{query_name}/{timestamp}_{batch_id}/{file}"

                    with open(local_file_path, "rb") as data:
                        container_client.upload_blob(name=blob_path, data=data, overwrite=True)
                    files_uploaded += 1

        print(f"Batch {batch_id} for {query_name} uploaded to Azure Blob Storage ({files_uploaded} files)")
    except Exception as e:
        print(f"Error processing batch {batch_id}: {str(e)}")
        import traceback
        traceback.print_exc()

In [13]:
!mkdir -p /tmp/stream-checkpoints

In [14]:
jar_dependencies= ",".join([
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
    "org.apache.spark:spark-avro_2.12:3.5.0",
    "org.apache.hadoop:hadoop-azure:3.3.1",          # Hadoop Azure connector
    "com.microsoft.azure:azure-storage:8.6.6"        # Azure Blob SDK dependency
])

In [15]:
# spark = SparkSession.builder \
#     .appName("StreamingAVROToBlob") \
#     .config("spark.streaming.stopGracefullyOnShutdown", True) \
#     .config("spark.jars.packages", jar_dependencies) \
#     .config(f"fs.azure.account.key.{account_name}.blob.core.windows.net", account_key) \
#     .config("spark.sql.shuffle.partitions", 4) \
#     .master("local[*]") \
#     .getOrCreate()

spark = SparkSession.builder \
    .appName("StreamingAVROToBlob") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-avro_2.12:3.5.0") \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

In [17]:
# For passengers data
query_name = 'all_passengers_BLOB'
query_passengers = df_passenger.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("update") \
    .trigger(processingTime='30 seconds') \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()


# For drivers data
query_name = 'all_drivers_BLOB'
query_drivers = df_driver.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("update") \
    .trigger(processingTime='30 seconds') \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

In [38]:
query_passengers.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [63]:
query_drivers.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [20]:
df = spark.read.text(f"/tmp/stream-checkpoints/all_drivers_BLOB")
df.show(truncate=False)

Batch 9 for all_drivers_BLOB uploaded to Azure Blob Storage (1 files)
Batch 9 for all_drivers_BLOB uploaded to Azure Blob Storage (1 files)
+---------------------------------------------+
|value                                        |
+---------------------------------------------+
|{"id":"25fb07e5-3722-4b56-9c35-f8b23b2e87ae"}|
+---------------------------------------------+



# Analytical Queries

Your Spark job and input messages are ready to be worked on. Now, you can apply any transformations required to answer business questions.

IMPORTANT NOTE: if in config you chose "startingOffsets": "latest", then you must send data AFTER running df.writeStream...
In other words, Spark will only start 'consuming' events after you run .writeStream, meaning that it will show up as empty if no new events have been sent after running .writeStream. (For this to be a real-time analytics case, it should be set to latest, so our stats update as new data comes in. For testing purposes, easier to set it to 'earliest' cause then you just send once and can work with that).

## Setup of Query

In [10]:
!mkdir checkpoint

In [11]:
# If offset:Latest, send new events after running this cell.
query_name='all_passengers'
query_passengers=df_passenger.writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName(query_name) \
    .start()

In [12]:
# If offset:Latest, send new events after running this cell.
query_name='all_drivers'
query_drivers=df_driver.writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName(query_name) \
    .start()

In [13]:
spark.sql('show tables').show()

+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|         |   all_drivers|       true|
|         |all_passengers|       true|
+---------+--------------+-----------+



In [14]:
# Status either "Processing new data" or "Getting offsets from..."
query_passengers.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [15]:
query_drivers.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [17]:
print(spark.sql(f'SELECT count(*) as record_count FROM all_drivers').show(20, truncate=True))

+------------+
|record_count|
+------------+
|         300|
+------------+

None


In [19]:
print(spark.sql(f'SELECT count(*) as record_count FROM all_passengers').show(20, truncate=True))

+------------+
|record_count|
+------------+
|          12|
+------------+

None


In [20]:
print(spark.sql(f'SELECT * FROM all_passengers').show(20, truncate=True))

+-------------------+------------+-------------+------------------+------------------+------------------+------------------+------------+----------------+---------------------+----------+--------------+------------+-------------------+--------------+--------------------+-------------+---------+----------+-----------------+------------------+-------------+
|         request_id|passenger_id|    timestamp|   pickup_latitude|  pickup_longitude|  dropoff_latitude| dropoff_longitude|vehicle_type|music_preference|preferred_temperature|quiet_ride|payment_method|coupon_codes|loyalty_points_used|estimated_fare|       text_messages|driver_rating|   status| driver_id|request_timestamp|accepted_timestamp|ride_duration|
+-------------------+------------+-------------+------------------+------------------+------------------+------------------+------------+----------------+---------------------+----------+--------------+------------+-------------------+--------------+--------------------+-------------

In [21]:
print(spark.sql(f'SELECT * FROM all_drivers').show(20, truncate=True))

+----------+-------------+------------------+------------------+---------+
| driver_id|    timestamp|          latitude|         longitude|   status|
+----------+-------------+------------------+------------------+---------+
|driver_000|1745435062276|40.725089908091384|-74.02561436848974|  ON_RIDE|
|driver_001|1745435062278| 40.72652353684726|-73.95025208036424|  OFFLINE|
|driver_002|1745435062278| 40.67645920013038| -74.0434774049815|  OFFLINE|
|driver_003|1745435062278|40.879669765723904|-74.07841783926281|  OFFLINE|
|driver_004|1745435062278| 40.76182214782082|-73.87144994657183|  OFFLINE|
|driver_005|1745435062278| 40.72061256119842|-74.04169309515052|AVAILABLE|
|driver_006|1745435062278| 40.78862051110513| -74.0016071273915|  OFFLINE|
|driver_007|1745435062278|  40.6011054271701|-74.07100029862085|  OFFLINE|
|driver_008|1745435062278|40.661197085687405|-74.08892114637109|  OFFLINE|
|driver_009|1745435062278| 40.89448723897804|  -74.000468707782|  OFFLINE|
|driver_010|1745435062278

In [None]:
# Write df_passenger to CSV
query_passengers_to_csv = df_passenger.writeStream \
    .format("csv") \
    .option("path", "/tmp/passengers_stream_output") \
    .option("checkpointLocation", "/tmp/passengers_checkpoint") \
    .outputMode("append") \
    .start()

# Write df_driver to CSV
query_drivers_to_csv = df_driver.writeStream \
    .format("csv") \
    .option("path", "/tmp/drivers_stream_output") \
    .option("checkpointLocation", "/tmp/drivers_checkpoint") \
    .outputMode("append") \
    .start()

time.sleep(35) # PRODUCER SHOULD BE SEIDNING NOW!!!

query_passengers_to_csv.stop()
query_drivers_to_csv.stop()

import os
import glob
import shutil

# Collect from memory sink and drop unsupported columns
drivers_df = spark.sql("SELECT * FROM all_drivers")
passengers_df = spark.sql("SELECT * FROM all_passengers").drop("coupon_codes", "text_messages")

# Coalesce to 1 partition and write with header
drivers_df.coalesce(1).write.option("header", True).mode("overwrite").csv("/tmp/final_drivers_single")
passengers_df.coalesce(1).write.option("header", True).mode("overwrite").csv("/tmp/final_passengers_single")

# Rename single part file for drivers
driver_csv = glob.glob("/tmp/final_drivers_single/part-*.csv")[0]
os.rename(driver_csv, "/tmp/final_drivers_single/drivers.csv")

# Rename single part file for passengers
passenger_csv = glob.glob("/tmp/final_passengers_single/part-*.csv")[0]
os.rename(passenger_csv, "/tmp/final_passengers_single/passengers.csv")

# Zip folders
shutil.make_archive("/tmp/final_drivers_csv_single", 'zip', "/tmp/final_drivers_single")
shutil.make_archive("/tmp/final_passengers_csv_single", 'zip', "/tmp/final_passengers_single")

# Move for download
!cp /tmp/final_drivers_csv_single.zip /content/final_drivers_csv_single.zip
!cp /tmp/final_passengers_csv_single.zip /content/final_passengers_csv_single.zip

# Transformation 1: number of driver updates per status

In [67]:
from pyspark.sql.functions import from_unixtime, col, window

df_driver_ts = df_driver.withColumn(
    "timestamp_ts",
    from_unixtime(col("timestamp") / 1000).cast("timestamp")
)

df_driver_ts.createOrReplaceTempView("driver_events_view")

In [68]:
ride_metrics_query = spark.sql("""
    SELECT
        'ride_metrics' AS metric_type,
        status AS dimension,
        COUNT(*) AS metric_value,
        window(timestamp_ts, '1 hour') AS window_interval
    FROM driver_events_view
    GROUP BY window(timestamp_ts, '1 hour'), status
""")

In [None]:
query_name = 'driver_status_metrics_BLOB'
ride_metrics_stream = ride_metrics_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime='30 seconds') \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

In [72]:
ride_metrics_stream.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

# Transformation 2: Trips status

In [75]:
from pyspark.sql.functions import from_unixtime, col, window

# Add timestamp column (if not already present)
df_passenger_ts = df_passenger.withColumn(
    "timestamp_ts",
    from_unixtime(col("timestamp") / 1000).cast("timestamp")
)

# Create temporary view
df_passenger_ts.createOrReplaceTempView("passenger_events_view")

# Define the windowed trip status aggregation
trip_status_query = spark.sql("""
    SELECT
        'trip_status' AS metric_type,
        status AS dimension,
        COUNT(*) AS metric_value,
        window(timestamp_ts, '1 hour') AS window_interval
    FROM passenger_events_view
    GROUP BY window(timestamp_ts, '1 hour'), status
""")

# Stream to Azure Blob
query_name = "trip_status_metrics_BLOB"
trip_status_stream = trip_status_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

Error processing batch 25: An error occurred while calling o3632.count.
: org.apache.spark.SparkException: Job 911 cancelled part of cancelled job group a3c3445a-dbd2-4770-b7ce-4b694caaa4c5
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:2731)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleJobGroupCancelled$4(DAGScheduler.scala:1198)
	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.handleJobGroupCancelled(DAGScheduler.scala:1197)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3016)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983

Traceback (most recent call last):
  File "<ipython-input-12-d03e644844f7>", line 11, in process_batch
    count = batch_df.count()
            ^^^^^^^^^^^^^^^^
  File "/content/spark-3.5.5-bin-hadoop3/python/pyspark/sql/dataframe.py", line 1240, in count
    return int(self._jdf.count())
               ^^^^^^^^^^^^^^^^^
  File "/content/spark-3.5.5-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/content/spark-3.5.5-bin-hadoop3/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/content/spark-3.5.5-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o3632.count.
: org.apache.spark.SparkException: Job 911 cancelled part of cancelled job group a3c3445a-dbd2-4770-b7ce-4b69

In [None]:
trip_status_stream.status

# Transformation 3: average response time per vehicle type

In [43]:
from pyspark.sql.functions import col, from_unixtime, window, unix_timestamp

df_passenger_ts = df_passenger.withColumn(
    "timestamp_ts", from_unixtime(col("timestamp") / 1000).cast("timestamp")
).withColumn(
    "request_ts", from_unixtime(col("request_timestamp") / 1000).cast("timestamp")
).withColumn(
    "accepted_ts", from_unixtime(col("accepted_timestamp") / 1000).cast("timestamp")
)

df_passenger_ts.createOrReplaceTempView("passenger_events_view")

Batch 1 for trip_status_metrics_BLOB uploaded to Azure Blob Storage (1 files)


In [44]:
response_time_query = spark.sql("""
    SELECT
        'response_time' AS metric_type,
        vehicle_type AS dimension,
        AVG((unix_timestamp(accepted_ts) - unix_timestamp(request_ts)) / 60.0) AS metric_value,
        window(timestamp_ts, '15 minutes', '5 minutes').end AS window_end
    FROM passenger_events_view
    WHERE status = 'COMPLETED'
    GROUP BY window(timestamp_ts, '15 minutes', '5 minutes'), vehicle_type
""")

Batch 3 for trip_status_metrics_BLOB uploaded to Azure Blob Storage (1 files)


In [45]:
query_name = 'response_time_BLOB'
response_time_query = response_time_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime='30 seconds') \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

In [None]:
response_time_query.status

# Transformation 4: Average Ride Duration per Hour of Day

In [60]:
from pyspark.sql.functions import from_unixtime, col, hour

# Enrich timestamps if not already present
df_passenger_ts = df_passenger.withColumn(
    "timestamp_ts", from_unixtime(col("timestamp") / 1000).cast("timestamp")
).withColumn(
    "request_ts", from_unixtime(col("request_timestamp") / 1000).cast("timestamp")
).withColumn(
    "accepted_ts", from_unixtime(col("accepted_timestamp") / 1000).cast("timestamp")
)

# Temporary view for SQL
df_passenger_ts.createOrReplaceTempView("passenger_events_view")

# Query: Average ride duration per hour
ride_duration_query = spark.sql("""
    SELECT
        'ride_duration' AS metric_type,
        CAST(HOUR(timestamp_ts) AS STRING) AS dimension,
        AVG(ride_duration) AS metric_value,
        window(timestamp_ts, '60 minutes', '60 minutes').end AS window_end
    FROM passenger_events_view
    WHERE status = 'COMPLETED'
    GROUP BY window(timestamp_ts, '60 minutes', '60 minutes'), HOUR(timestamp_ts)
""")

# Write to blob using foreachBatch
query_name = "ride_duration_metrics_BLOB"
ride_duration_stream = ride_duration_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

Starting to process batch 80 for ride_duration_metrics_BLOB


In [None]:
ride_duration_stream.status

# Transformation 5: requested/accepted ratio per area

In [36]:
from pyspark.sql.functions import col, from_unixtime, when

df_passenger_quadrant = df_passenger.withColumn(
    "timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp")
).withColumn(
    "quadrant", when((col("pickup_latitude") >= 40.75) & (col("pickup_longitude") >= -73.95), "NE")
                .when((col("pickup_latitude") >= 40.75) & (col("pickup_longitude") < -73.95), "NW")
                .when((col("pickup_latitude") < 40.75) & (col("pickup_longitude") >= -73.95), "SE")
                .otherwise("SW")
)

df_passenger_quadrant.createOrReplaceTempView("passenger_quadrant_view")

In [37]:
ride_ratio_query = spark.sql("""
SELECT
  'request_acceptance_ratio' AS metric_type,
  quadrant AS dimension,
  SUM(CASE WHEN accepted_timestamp IS NOT NULL THEN 1 ELSE 0 END) * 1.0 /
  COUNT(*) AS metric_value,
  window(timestamp, '15 minutes', '5 minutes') AS window_interval
FROM passenger_quadrant_view
GROUP BY window(timestamp, '15 minutes', '5 minutes'), quadrant
""")

In [38]:
query_name = 'request_acceptance_ratio_BLOB'
ride_ratio_query = ride_ratio_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime='30 seconds') \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

In [None]:
ride_ratio_query.status

# Transformation 6: Average Response Time per Vehicle Type

In [None]:
from pyspark.sql.functions import col, from_unixtime

# Enrich with usable timestamps
df_passenger_ts = df_passenger.withColumn(
    "timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp")
).withColumn(
    "request_ts", from_unixtime(col("request_timestamp") / 1000).cast("timestamp")
).withColumn(
    "accepted_ts", from_unixtime(col("accepted_timestamp") / 1000).cast("timestamp")
)

# Create view
df_passenger_ts.createOrReplaceTempView("passenger_events_view")

# Response time query with window and dimension
response_time_query = spark.sql("""
SELECT
  'response_time' AS metric_type,
  vehicle_type AS dimension,
  AVG((unix_timestamp(accepted_ts) - unix_timestamp(request_ts)) / 60.0) AS metric_value,
  window(timestamp, '15 minutes', '5 minutes') AS window_interval
FROM passenger_events_view
WHERE status = 'COMPLETED'
GROUP BY window(timestamp, '15 minutes', '5 minutes'), vehicle_type
""")

# Write to blob
query_name = "response_time_metrics_BLOB"
response_time_stream = response_time_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

In [None]:
response_time_stream.status

# Transformation 7: Active drivers per area

Quadrants:

NE: lat ≥ 40.75 and lon ≥ -73.95

NW: lat ≥ 40.75 and lon < -73.95

SE: lat < 40.75 and lon ≥ -73.95

SW: lat < 40.75 and lon < -73.95



In [79]:
from pyspark.sql.functions import when

df_driver_quadrant = df_driver.withColumn(
    "timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp")
).withColumn(
    "quadrant", when((col("latitude") >= 40.75) & (col("longitude") >= -73.95), "NE")
                .when((col("latitude") >= 40.75) & (col("longitude") < -73.95), "NW")
                .when((col("latitude") < 40.75) & (col("longitude") >= -73.95), "SE")
                .otherwise("SW")
)

df_driver_quadrant.createOrReplaceTempView("driver_quadrant_view")

In [80]:
online_drivers_query = spark.sql("""
SELECT
  'online_driver_count' AS metric_type,
  quadrant AS dimension,
  COUNT(*) AS metric_value,
  window(timestamp, '15 minutes', '5 minutes') AS window_interval
FROM driver_quadrant_view
WHERE status IN ('AVAILABLE', 'ON_RIDE')
GROUP BY window(timestamp, '15 minutes', '5 minutes'), quadrant
""")

In [81]:
query_name = 'online_driver_count_BLOB'
online_drivers_query = online_drivers_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime='30 seconds') \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

Error writing batch 25 to Parquet: An error occurred while calling o4329.parquet.
: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
	at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:342)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:980)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(Fi

In [None]:
online_drivers_query.status

# Transformation 8: Average Wait Time per Area

In [83]:
from pyspark.sql.functions import col, from_unixtime, when

# Enrich DataFrame with timestamps and area quadrant
df_wait_ts = df_passenger.withColumn(
    "timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp")
).withColumn(
    "request_ts", from_unixtime(col("request_timestamp") / 1000).cast("timestamp")
).withColumn(
    "accepted_ts", from_unixtime(col("accepted_timestamp") / 1000).cast("timestamp")
).withColumn(
    "quadrant", when((col("pickup_latitude") >= 40.75) & (col("pickup_longitude") >= -73.95), "NE")
               .when((col("pickup_latitude") >= 40.75) & (col("pickup_longitude") < -73.95), "NW")
               .when((col("pickup_latitude") < 40.75) & (col("pickup_longitude") >= -73.95), "SE")
               .otherwise("SW")
)

# Temp view
df_wait_ts.createOrReplaceTempView("wait_time_quadrant_view")

# Query: wait time by quadrant and window
wait_time_query = spark.sql("""
SELECT
  'average_wait_time' AS metric_type,
  quadrant AS dimension,
  AVG(unix_timestamp(accepted_ts) - unix_timestamp(request_ts)) AS metric_value,
  window(timestamp, '15 minutes', '5 minutes') AS window_interval
FROM wait_time_quadrant_view
WHERE accepted_ts IS NOT NULL AND request_ts IS NOT NULL
GROUP BY window(timestamp, '15 minutes', '5 minutes'), quadrant
""")

# Stream to Azure
query_name = "wait_time_metrics_BLOB"
wait_time_stream = wait_time_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

Error writing batch 7 to Parquet: An error occurred while calling o4559.parquet.
: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
	at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:342)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:980)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(Fil

In [None]:
wait_time_stream.status

# Transformation 9: Driver Utilization Rate

In [49]:
df_driver_ts = df_driver.withColumn(
    "timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp")
)

df_driver_ts.createOrReplaceTempView("driver_events_view")

In [50]:
utilization_query = spark.sql("""
SELECT
  'driver_utilization' AS metric_type,
  'global' AS dimension,
  SUM(CASE WHEN status = 'ON_RIDE' THEN 1 ELSE 0 END) * 1.0 /
  SUM(CASE WHEN status IN ('AVAILABLE', 'ON_RIDE') THEN 1 ELSE 0 END) AS metric_value,
  window(timestamp, '15 minutes', '5 minutes') AS window_interval
FROM driver_events_view
WHERE status IN ('AVAILABLE', 'ON_RIDE')
GROUP BY window(timestamp, '15 minutes', '5 minutes')
""")

In [51]:
query_name = 'driver_utilization_BLOB'
utilization_query = utilization_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime='30 seconds') \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

In [None]:
utilization_query.status

# Transformation 10: Vehicle Type Demand Share

In [64]:
from pyspark.sql.functions import col, from_unixtime, window

# Prepare timestamp
df_passenger_ts = df_passenger.withColumn(
    "timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp")
)

# Function to compute vehicle type demand share per batch
def compute_vehicle_share(batch_df, epoch_id):
    vehicle_counts = batch_df.filter(col("status") == "COMPLETED") \
        .groupBy(
            window(col("timestamp"), "15 minutes", "5 minutes"),
            col("vehicle_type")
        ) \
        .count() \
        .withColumnRenamed("count", "vehicle_count")

    total_counts = batch_df.filter(col("status") == "COMPLETED") \
        .groupBy(
            window(col("timestamp"), "15 minutes", "5 minutes")
        ) \
        .count() \
        .withColumnRenamed("count", "total_count")

    joined = vehicle_counts.join(
        total_counts,
        on="window"
    ).withColumn(
        "metric_type", col("vehicle_type") * 0 + 1  # Dummy to include field
    ).selectExpr(
        "'vehicle_type_share' AS metric_type",
        "vehicle_type AS dimension",
        "window AS window_interval",
        "vehicle_count * 1.0 / total_count AS metric_value"
    )

    process_batch(joined, epoch_id, "vehicle_type_share_BLOB")

# WriteStream using foreachBatch
df_passenger_ts.writeStream \
    .foreachBatch(compute_vehicle_share) \
    .outputMode("append") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", "/tmp/stream-checkpoints/vehicle_type_share_BLOB") \
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x785bc2949850>

In [None]:
df_passenger_ts.status

# Transformation 11: Cancellation Rate

In [56]:
cancellation_query = spark.sql("""
SELECT
  'cancellation_rate' AS metric_type,
  'global' AS dimension,
  window(timestamp_ts, '15 minutes', '5 minutes') AS window_interval,
  SUM(CASE WHEN status = 'CANCELLED' THEN 1 ELSE 0 END) * 1.0 /
  COUNT(*) AS metric_value
FROM passenger_events_view
GROUP BY window(timestamp_ts, '15 minutes', '5 minutes')
""")


query_name = 'cancellation_rate_BLOB'
cancellation_stream = cancellation_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime='30 seconds') \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

In [None]:
cancellation_stream.status

# Transformation 12: Ride Matching Delay by Area

In [65]:
from pyspark.sql.functions import col, from_unixtime, when

# Enrich with timestamps and quadrant
df_match_ts = df_passenger.withColumn(
    "timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp")
).withColumn(
    "request_ts", from_unixtime(col("request_timestamp") / 1000).cast("timestamp")
).withColumn(
    "accepted_ts", from_unixtime(col("accepted_timestamp") / 1000).cast("timestamp")
).withColumn(
    "quadrant", when((col("pickup_latitude") >= 40.75) & (col("pickup_longitude") >= -73.95), "NE")
               .when((col("pickup_latitude") >= 40.75) & (col("pickup_longitude") < -73.95), "NW")
               .when((col("pickup_latitude") < 40.75) & (col("pickup_longitude") >= -73.95), "SE")
               .otherwise("SW")
)

# Temp view
df_match_ts.createOrReplaceTempView("matching_delay_view")

# SQL query for match delay by quadrant
match_delay_query = spark.sql("""
SELECT
  'match_delay' AS metric_type,
  quadrant AS dimension,
  AVG((unix_timestamp(accepted_ts) - unix_timestamp(request_ts)) / 60.0) AS metric_value,
  window(timestamp, '15 minutes', '5 minutes') AS window_interval
FROM matching_delay_view
WHERE accepted_ts IS NOT NULL AND request_ts IS NOT NULL
GROUP BY window(timestamp, '15 minutes', '5 minutes'), quadrant
""")

# Stream to Azure
query_name = "match_delay_metrics_BLOB"
match_delay_stream = match_delay_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

In [None]:
match_delay_stream.status

# Transformation 13: Drive Efficiency (high-level)

**Measures the distance and sees if drivers are being efficient**

Efficiency Ratio= Ride Duration / Straight-line Distance
​

We assume Haversine distance (straight-line) is “ideal”

Duration = ride_duration in seconds

In [57]:
from pyspark.sql.functions import col, radians, sin, cos, atan2, sqrt, lit

R = 6371.0  # Earth radius in kilometers

df_efficiency = df_passenger.withColumn(
    "pickup_lat_rad", radians(col("pickup_latitude"))
).withColumn(
    "pickup_lon_rad", radians(col("pickup_longitude"))
).withColumn(
    "dropoff_lat_rad", radians(col("dropoff_latitude"))
).withColumn(
    "dropoff_lon_rad", radians(col("dropoff_longitude"))
).withColumn(
    "dlat", col("dropoff_lat_rad") - col("pickup_lat_rad")
).withColumn(
    "dlon", col("dropoff_lon_rad") - col("pickup_lon_rad")
).withColumn(
    "a", sin(col("dlat") / 2) ** 2 + cos(col("pickup_lat_rad")) * cos(col("dropoff_lat_rad")) * sin(col("dlon") / 2) ** 2
).withColumn(
    "c", 2 * atan2(sqrt(col("a")), sqrt(1 - col("a")))
).withColumn(
    "distance_km", R * col("c")
).withColumn(
    "timestamp", from_unixtime(col("timestamp") / 1000).cast("timestamp")
)

df_efficiency.createOrReplaceTempView("route_efficiency_view")

Starting to process batch 45 for cancellation_rate_BLOB
Starting to process batch 9 for cancellation_rate_BLOB
Starting to process batch 7 for cancellation_rate_BLOB
Starting to process batch 4 for cancellation_rate_BLOB
Starting to process batch 10 for cancellation_rate_BLOB


In [58]:
efficiency_query = spark.sql("""
SELECT
  'route_efficiency' AS metric_type,
  vehicle_type AS dimension,
  AVG(distance_km / (ride_duration / 60.0)) AS metric_value,  -- km per minute
  window(timestamp, '15 minutes', '5 minutes') AS window_interval
FROM route_efficiency_view
WHERE ride_duration > 0 AND distance_km IS NOT NULL
GROUP BY window(timestamp, '15 minutes', '5 minutes'), vehicle_type
""")

In [59]:
query_name = 'route_efficiency_BLOB'
efficiency_query = efficiency_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime='30 seconds') \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

In [None]:
efficiency_query.status

# Transformation 14: Pricing Analytics Anomalies (High Level)

In [84]:
from pyspark.sql.functions import col, radians, sin, cos, atan2, sqrt, lit, when, unix_timestamp, window

R = 6371.0  # Earth radius in km

# Compute anomaly flags and required fields
df_anomaly = df_passenger.withColumn("pickup_lat_rad", radians(col("pickup_latitude"))) \
    .withColumn("pickup_lon_rad", radians(col("pickup_longitude"))) \
    .withColumn("dropoff_lat_rad", radians(col("dropoff_latitude"))) \
    .withColumn("dropoff_lon_rad", radians(col("dropoff_longitude"))) \
    .withColumn("dlat", col("dropoff_lat_rad") - col("pickup_lat_rad")) \
    .withColumn("dlon", col("dropoff_lon_rad") - col("pickup_lon_rad")) \
    .withColumn("a", sin(col("dlat") / 2) ** 2 + cos(col("pickup_lat_rad")) * cos(col("dropoff_lat_rad")) * sin(col("dlon") / 2) ** 2) \
    .withColumn("c", 2 * atan2(sqrt(col("a")), sqrt(1 - col("a")))) \
    .withColumn("distance_km", R * col("c")) \
    .withColumn("duration_min", col("ride_duration") / 60.0) \
    .withColumn("expected_fare",
        when(col("vehicle_type") == "ECONOMY", 2.5 + 1.2 * col("distance_km") + 0.3 * col("duration_min"))
        .when(col("vehicle_type") == "LUXURY", 5.0 + 2.0 * col("distance_km") + 0.6 * col("duration_min"))
        .when(col("vehicle_type") == "SUV", 4.0 + 1.5 * col("distance_km") + 0.4 * col("duration_min"))
    ) \
    .withColumn("relative_error", (col("estimated_fare") - col("expected_fare")) / col("expected_fare")) \
    .withColumn("is_anomaly", col("relative_error") > 0.3) \
    .withColumn("timestamp", (col("timestamp") / 1000).cast("timestamp"))

# View for SQL
df_anomaly.createOrReplaceTempView("fare_anomalies_view")

# Query: count anomalies per vehicle type per 5-minute window
anomaly_count_query = spark.sql("""
    SELECT
        'pricing_anomaly' AS metric_type,
        vehicle_type AS dimension,
        COUNT(*) AS metric_value,
        window(timestamp, '5 minutes') AS window_interval
    FROM fare_anomalies_view
    WHERE is_anomaly = true
    GROUP BY window(timestamp, '5 minutes'), vehicle_type
""")

# Write to blob
query_name = "pricing_anomaly_metrics_BLOB"
anomaly_stream = anomaly_count_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

Error writing batch 7 to Parquet: An error occurred while calling o4771.parquet.
: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
	at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:342)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:980)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(Fil

In [None]:
anomaly_stream.status

# Transformation 15: Pricing analytics

In [None]:
# Make sure to convert timestamp to timestamp_ts first if not already done
df_anomaly_area = df_anomaly.withColumn(
    "timestamp_ts",
    from_unixtime(col("timestamp") / 1000).cast("timestamp")
).withColumn(
    "quadrant",
    when((col("pickup_latitude") >= 40.75) & (col("pickup_longitude") <= -73.95), "NE")
    .when((col("pickup_latitude") >= 40.75) & (col("pickup_longitude") > -73.95), "NW")
    .when((col("pickup_latitude") < 40.75) & (col("pickup_longitude") <= -73.95), "SE")
    .otherwise("SW")
)

df_anomaly_area.createOrReplaceTempView("fare_anomalies_area_view")

# Fix the query to use timestamp_ts instead of timestamp
anomaly_area_query = spark.sql("""
    SELECT
        'pricing_anomaly_area' AS metric_type,
        quadrant AS dimension,
        COUNT(*) AS metric_value,
        window(timestamp_ts, '5 minutes') AS window_interval
    FROM fare_anomalies_area_view
    WHERE is_anomaly = true
    GROUP BY window(timestamp_ts, '5 minutes'), quadrant
""")

# Add blob storage integration
query_name = 'pricing_anomaly_area_BLOB'
anomaly_area_stream = anomaly_area_query.writeStream \
    .foreachBatch(lambda df, id: process_batch(df, id, query_name)) \
    .outputMode("complete") \
    .trigger(processingTime='30 seconds') \
    .option("checkpointLocation", f"/tmp/stream-checkpoints/{query_name}") \
    .start()

In [None]:
anomaly_area_stream.status

# Stop your queries and your spark job

In [None]:
# Set to True and run cell when you want to stop your queries and Spark job.
if True:
  # Get the list of active streaming queries
  active_queries = spark.streams.active

# Print details about each active query
  for query in active_queries:
      query.stop()
      print(f"Query Name: {query.name}")
      print(f"Query ID: {query.id}")
      print(f"Query Status: {query.status}")
      print(f"Is Query Active: {query.isActive}")
      print("-" * 50)
  spark.stop()
  spark.sparkContext.stop()