# Analyzing New York City Taxi Data with Spark Structured Streaming

## Setup

In [149]:
import os
from delta import configure_spark_with_delta_pip

import time
import uuid
from pyspark.sql import Window
from pyspark.sql.functions import lag, col, avg, unix_timestamp, explode, lead, sum, split
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType, DateType

In [2]:
builder = SparkSession.builder.appName("NYTaxiTrips") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", spark._sc.defaultParallelism)

In [11]:
BOOTSTRAP_SERVERS = os.environ.get('BOOTSTRAP_SERVERS')
assert BOOTSTRAP_SERVERS is not None, 'BOOTSTRAP_SERVERS must be set'

NYC_BOROUGHS_GEOJSON = "nyc-boroughs.geojson"
assert os.path.exists(NYC_BOROUGHS_GEOJSON), f'{NYC_BOROUGHS_GEOJSON} not found'

TRIP_TOPIC = 'trips'
FARE_TOPIC = 'fares'

Be sure to start the stream on Kafka!

In [4]:
plain_trip_schema = StructType(
    [
        StructField("medallion", StringType(), False),
        StructField("hack_license", StringType(), False),
        StructField("vendor_id", StringType(), False),
        StructField("rate_code", StringType(), False),
        StructField("store_and_fwd_flag", StringType(), False),
        StructField("pickup_datetime", TimestampType(), False),
        StructField("dropoff_datetime", TimestampType(), False),
        StructField("passenger_count", StringType(), False),
        StructField("trip_time_in_secs", StringType(), False),
        StructField("trip_distance", StringType(), False),
        StructField("pickup_longitude", StringType(), False),
        StructField("pickup_latitude", StringType(), False),
        StructField("dropoff_longitude", StringType(), False),
        StructField("dropoff_latitude", StringType(), False),
        StructField("timestamp", TimestampType(), False),
    ]
)

casted_trip_schema = StructType(
    [
        StructField("medallion", StringType(), False),
        StructField("hack_license", StringType(), False),
        StructField("vendor_id", StringType(), False),
        StructField("rate_code", IntegerType(), False),
        StructField("store_and_fwd_flag", StringType(), False),
        StructField("pickup_datetime", TimestampType(), False),
        StructField("dropoff_datetime", TimestampType(), False),
        StructField("passenger_count", IntegerType(), False),
        StructField("trip_time_in_secs", DoubleType(), False),
        StructField("trip_distance", DoubleType(), False),
        StructField("pickup_longitude", DoubleType(), False),
        StructField("pickup_latitude", DoubleType(), False),
        StructField("dropoff_longitude", DoubleType(), False),
        StructField("dropoff_latitude", DoubleType(), False),
        StructField("timestamp", TimestampType(), False),

    ]
)

fare_schema = StructType(
    [
        StructField("medallion", StringType(), False),
        StructField("hack_license", StringType(), False),
        StructField("vendor_id", StringType(), False),
        StructField("pickup_datetime", TimestampType(), False),
        StructField("payment_type", StringType(), False),
        StructField("fare_amount", DoubleType(), False),
        StructField("surcharge", DoubleType(), False),
        StructField("mta_tax", DoubleType(), False),
        StructField("tip_amount", DoubleType(), False),
        StructField("tolls_amount", DoubleType(), False),
        StructField("total_amount", DoubleType(), False),
        StructField("timestamp", TimestampType(), False),

    ]
)

In [5]:
from pyspark.sql.functions import from_json

lines = (spark.readStream                        # Get the DataStreamReader
  .format("kafka")                                 # Specify the source format as "kafka"
  .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) # Configure the Kafka server name and port
  .option("subscribe", TRIP_TOPIC)                       # Subscribe to the "en" Kafka topic 
  .option("startingOffsets", "earliest")           # The start point when a query is started
  .option("maxOffsetsPerTrigger", 100)             # Rate limit on max offsets per trigger interval
  .load()
)

## Utils

This section contains  utlitity functions that will be used throughout the notebook.


In [155]:
def active_streams():
    return [a.name for a in spark.streams.active]

def stop_stream(name):
    for stream in spark.streams.active:
        if stream.name == name:
            stream.stop()
            return True
    return False

def inmemory_stream(df, wait_seconds=5, output_mode="append"):
    temp_table_name = f"inmemory_{uuid.uuid4().hex}"
    query = (
        df.writeStream.outputMode(output_mode)
        .format("memory")
        .queryName(temp_table_name)
        .start()
    )

    time.sleep(wait_seconds)
    result = spark.sql(f"SELECT * FROM {temp_table_name}")
    stop_stream(query.name)
    return result

def is_coordinate_in_polygon(longitude, latitude, polygon):
    num = len(polygon)
    i = 0
    j = num - 1
    is_inside = False
    for i in range(num):
        if ((polygon[i][1] > latitude) != (polygon[j][1] > latitude)) and (
            longitude
            < polygon[i][0]
            + (polygon[j][0] - polygon[i][0])
            * (latitude - polygon[i][1])
            / (polygon[j][1] - polygon[i][1])
        ):
            is_inside = not is_inside
        j = i
    return is_inside

def find_borough(longitude, latitude, nyc_boroughs):
    for _, item in nyc_boroughs.value.items():
        if item is None or item[0] is None:
            continue
        geometry = item[0]
        polygons = geometry["coordinates"]

        properties = item[1]
        borough = properties["borough"]
        borough_code = properties["boroughCode"]

        if any([is_coordinate_in_polygon(longitude, latitude, polygon) for polygon in polygons]):
            return borough
    return "Other"

## Tasks

### QUERY-1

Utilization over a window of 5, 10, and 15 minutes per taxi/driver. This can be computed by computing the idle time per taxi. How does it change? Is there an optimal window?

In [7]:
trips_df = lines.select(from_json(col("value").cast("string"), plain_trip_schema).alias("parsed_value"))
trips_df = trips_df.select("parsed_value.*")

for field in casted_trip_schema.fields:
    trips_df = trips_df.withColumn(field.name, col(field.name).cast(field.dataType))

trips_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [None]:
from pyspark.sql.functions import window, col, max, min

window_durations = [5 * 60, 10 * 60, 15 * 60]

for window_duration in window_durations:
    window_col = window("pickup_datetime", "{} seconds".format(window_duration))
    df_windowed = trips_df.groupBy("medallion", window_col).agg((max("dropoff_datetime").cast("long") - min("pickup_datetime").cast("long")).alias("busy_time"))
    
    df_windowed = df_windowed.withColumn("idle_time", window_duration - col("busy_time"))
    
    df_windowed = df_windowed.withColumn("utilization", col("busy_time") / window_duration)
    
    query = (df_windowed.writeStream
      .outputMode("complete")
      .format("memory")
      .queryName("utilization_{}_minutes".format(window_duration // 60))
      .trigger(processingTime="5 second")
      .start())

spark.sql("SELECT * FROM utilization_5_minutes").show()
spark.sql("SELECT * FROM utilization_10_minutes").show()
spark.sql("SELECT * FROM utilization_15_minutes").show()

### QUERY-2 

The average time it takes for a taxi to find its next fare(trip) per destination borough. This can be computed by finding the time difference, e.g. in seconds, between the trip's drop off and the next trip's pick up within a given unit of time

In [78]:
nyc_boroghs_df = spark.read.json(NYC_BOROUGHS_GEOJSON)
nyc_boroghs_df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- id: long (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- @id: string (nullable = true)
 |    |-- borough: string (nullable = true)
 |    |-- boroughCode: long (nullable = true)
 |-- type: string (nullable = true)



In [131]:
from pyspark.sql.functions import struct

nyc_borogh_clean_df = nyc_boroghs_df.select("id", struct("geometry", "properties", "type").alias("value"))
nyc_borough_dict = nyc_borogh_clean_df.rdd.collectAsMap()
nyc_borough_broadcast = spark.sparkContext.broadcast(nyc_borough_dict)

def find_nyc_borough(longitude, latitude):
    global nyc_borough_broadcast
    return find_borough(longitude, latitude, nyc_borough_broadcast)

assert find_nyc_borough(-73.978165,40.757977) == "Manhattan"

In [199]:
from pyspark.sql.functions import udf

find_borough_udf = udf(find_nyc_borough, StringType())

borough_df = (
    trips_df.select("dropoff_longitude", "dropoff_latitude", "pickup_longitude", "pickup_latitude", "pickup_datetime", "dropoff_datetime", "passenger_count", "timestamp")
    .withColumn("stop_borough", find_borough_udf("dropoff_longitude", "dropoff_latitude"))
    .withColumn("start_borough", find_borough_udf("pickup_longitude", "pickup_latitude"))
)

borough_df.printSchema()

root
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- stop_borough: string (nullable = true)
 |-- start_borough: string (nullable = true)



In [151]:
inmemory_stream(borough_df).show()

+-----------------+----------------+----------------+---------------+-------------------+-------------------+---------------+------------+-------------+
|dropoff_longitude|dropoff_latitude|pickup_longitude|pickup_latitude|    pickup_datetime|   dropoff_datetime|passenger_count|stop_borough|start_borough|
+-----------------+----------------+----------------+---------------+-------------------+-------------------+---------------+------------+-------------+
|       -73.989838|       40.751171|      -73.978165|      40.757977|2013-01-01 15:11:48|2013-01-01 15:18:10|              4|   Manhattan|    Manhattan|
|       -73.994499|        40.75066|      -74.006683|      40.731781|2013-01-06 00:18:35|2013-01-06 00:22:54|              1|   Manhattan|    Manhattan|
|       -74.009834|       40.726002|      -74.004707|       40.73777|2013-01-05 18:49:41|2013-01-05 18:54:23|              1|   Manhattan|    Manhattan|
|       -73.984734|       40.759388|      -73.974602|      40.759945|2013-01-07 23

In [152]:
passenger_count_avg = borough_df.groupBy("stop_borough").agg(avg("passenger_count"))

In [156]:
inmemory_stream(passenger_count_avg, output_mode="update").show()

+------------+--------------------+
|stop_borough|avg(passenger_count)|
+------------+--------------------+
|   Manhattan|  1.3636363636363635|
|      Queens|                 3.0|
+------------+--------------------+



In [None]:
window = Window.orderBy("pickup_datetime")

average_time_to_next_trip = borough_df.withColumn("next_pickup_datetime", lag(borough_df["pickup_datetime"]).over(window))
average_time_to_next_trip = borough_df.withColumn("time_to_next_fare", unix_timestamp("next_pickup_datetime") - unix_timestamp("dropoff_datetime"))
average_time_to_next_trip = borough_df.groupBy("stop_borough").agg(avg("time_to_next_fare"))

inmemory_stream(average_time_to_next_trip)

### QUERY-3

The number of trips that started and ended within the same borough in the last hour

In [185]:
from current timestamp reduce 11 years and 5 months
from pyspark.sql.functions import current_timestamp, expr

current_time = current_timestamp()
eleven_years_ago = current_time - expr("INTERVAL 11 YEARS")
five_months_ago = eleven_years_ago - expr("INTERVAL 5 MONTHS")


(Column<'(current_timestamp() - INTERVAL '11' YEAR)'>,
 Column<'((current_timestamp() - INTERVAL '11' YEAR) - INTERVAL '5' MONTH)[100]'>)

In [187]:
current_time = spark.range(1).select(current_timestamp().alias("current_time"))
eleven_years_ago = current_time.select((current_time["current_time"] - expr("INTERVAL 11 YEARS")).alias("eleven_years_ago"))
five_months_ago = eleven_years_ago.select((eleven_years_ago["eleven_years_ago"] - expr("INTERVAL 5 MONTHS")).alias("five_months_ago"))

current_time.show(truncate=False)
eleven_years_ago.show(truncate=False)
five_months_ago.show(truncate=False)

+--------------------------+
|current_time              |
+--------------------------+
|2024-06-06 09:50:51.405526|
+--------------------------+

+-------------------------+
|eleven_years_ago         |
+-------------------------+
|2013-06-06 09:50:51.93668|
+-------------------------+

+--------------------------+
|five_months_ago           |
+--------------------------+
|2013-01-06 09:50:52.093186|
+--------------------------+



In [206]:
from pyspark.sql.functions import col, current_timestamp, unix_timestamp, lit, hour, expr, window, count

current_time = current_timestamp()
eleven_years_ago = current_time - expr("INTERVAL 11 YEARS")
# current_time = eleven_years_ago - expr("INTERVAL 5 MONTHS")

# one_hour_ago = (unix_timestamp(current_time) - lit(60*360_000)).cast("timestamp")
one_hour_ago = (unix_timestamp(eleven_years_ago) - lit(60*216000)).cast("timestamp")

same_borough_trips =(
    borough_df
    .filter(col("start_borough") == col("stop_borough"))
    .groupBy(
        window("timestamp", "1440 minutes", "1440 minutes"), "start_borough" , "stop_borough"
    )
    .agg(
        count("*").alias("trip_count")
    )
)

inmemory_stream(same_borough_trips, output_mode="update").show()

+--------------------+-------------+------------+----------+
|              window|start_borough|stop_borough|trip_count|
+--------------------+-------------+------------+----------+
|{2024-06-04 00:00...|    Manhattan|   Manhattan|        21|
+--------------------+-------------+------------+----------+



In [197]:
from pyspark.sql.functions import col, current_timestamp, unix_timestamp, lit, hour

current_time = current_timestamp()
eleven_years_ago = current_time - expr("INTERVAL 11 YEARS")
# current_time = eleven_years_ago - expr("INTERVAL 5 MONTHS")

# one_hour_ago = (unix_timestamp(current_time) - lit(60*360_000)).cast("timestamp")
one_hour_ago = (unix_timestamp(eleven_years_ago) - lit(60*216000)).cast("timestamp")

same_borough_trips = borough_df.filter(
    (col("start_borough") == col("stop_borough")) & 
    (col("pickup_datetime") >= one_hour_ago) & 
    (col("pickup_datetime") <= current_time)
)

inmemory_stream(same_borough_trips).show()

+-----------------+----------------+----------------+---------------+-------------------+-------------------+---------------+------------+-------------+
|dropoff_longitude|dropoff_latitude|pickup_longitude|pickup_latitude|    pickup_datetime|   dropoff_datetime|passenger_count|stop_borough|start_borough|
+-----------------+----------------+----------------+---------------+-------------------+-------------------+---------------+------------+-------------+
|       -73.984734|       40.759388|      -73.974602|      40.759945|2013-01-07 23:54:15|2013-01-07 23:58:20|              2|   Manhattan|    Manhattan|
|       -74.002586|       40.747868|       -73.97625|      40.748528|2013-01-07 23:25:03|2013-01-07 23:34:24|              1|   Manhattan|    Manhattan|
|       -73.983322|       40.743763|      -73.966743|      40.764252|2013-01-07 15:27:48|2013-01-07 15:38:37|              1|   Manhattan|    Manhattan|
|       -74.007416|       40.744343|      -73.995804|      40.743977|2013-01-08 11

### QUERY-4 

The number of trips that started in one borough and ended in another one in the last hour

In [209]:
from pyspark.sql.functions import col, current_timestamp, unix_timestamp, lit, hour, expr, window, count

current_time = current_timestamp()
eleven_years_ago = current_time - expr("INTERVAL 11 YEARS")

one_hour_ago = (unix_timestamp(eleven_years_ago) - lit(60*216000)).cast("timestamp")

same_borough_trips =(
    borough_df
    .filter(col("start_borough") != col("stop_borough"))
    .groupBy(
        window("timestamp", "1440 minutes", "1440 minutes"), "start_borough" , "stop_borough"
    )
    .agg(
        count("*").alias("trip_count")
    )
)

inmemory_stream(same_borough_trips, output_mode="update").show()

+--------------------+-------------+------------+----------+
|              window|start_borough|stop_borough|trip_count|
+--------------------+-------------+------------+----------+
|{2024-06-04 00:00...|       Queens|   Manhattan|         1|
|{2024-06-04 00:00...|    Manhattan|      Queens|         1|
+--------------------+-------------+------------+----------+

