# Reading NYC taxi rides from Kafka using Spark Streaming

## PySpark setup

In [1]:
from pyspark.sql.session import SparkSession

from delta import *
from delta.tables import *


builder = (
    SparkSession.builder.appName("NYC_taxi_kafka")
    .config("spark.sql.extensions", 
            "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0")
    .config("spark.sql.repl.eagerEval.enabled", True)
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

## Define schema

In [2]:
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, 
    DoubleType, BooleanType, TimestampType, DateType
)

schema = StructType(
      [
        StructField("medallion",          StringType(), False),
        StructField("hack_licence",       StringType(), False),
        StructField("vendor_id",          StringType(), False),
        StructField("rate_code",          IntegerType(), False),
        StructField("store_and_fwd_flag", StringType(), False),
        StructField("pickup_datetime",    StringType(), False),
        StructField("dropoff_datetime",   StringType(), False),
        StructField("passenger_count",    IntegerType(), False),
        StructField("trip_time_in_secs",  IntegerType(), False),
        StructField("trip_distance",      DoubleType(), False),
        StructField("pickup_longitude",   DoubleType(), False),
        StructField("pickup_latitude",    DoubleType(), False),
        StructField("dropoff_longitude",  DoubleType(), False),
        StructField("dropoff_latitude",   DoubleType(), False),
        StructField("timestamp",          TimestampType(), False)
      ]
    )

## Read Kafka stream and parse JSON

Be sure to start the stream from the notebook `kafka_producer.ipynb`!

In [25]:
from pyspark.sql.functions import from_json, col

kafka_server = "kafka1:9092" 

kafka_df = (spark.readStream                        # Get the DataStreamReader
  .format("kafka")                                 # Specify the source format as "kafka"
  .option("kafka.bootstrap.servers", kafka_server) # Configure the Kafka server name and port
  .option("subscribe", "stock")                       # Subscribe to the "stock" Kafka topic 
  .option("startingOffsets", "earliest")           # The start point when a query is started
  .option("maxOffsetsPerTrigger", 100)             # Rate limit on max offsets per trigger interval
  .load() # Load the DataFrame
)

parsed_df = kafka_df.select(
    from_json(
        col("value").cast("string"), schema
    ).alias("parsed_value")
).select("parsed_value.*")

## Write data into a Delta table

In [30]:
import time, os

table_name = "taxi_rides"
checkpoint_path = "streaming/orders/_checkpoint" 
output_path = f"spark-warehouse/{table_name}"

query = (parsed_df.writeStream
  .outputMode("append")
  .format("delta")
  .queryName(f"{table_name}_query")
  .trigger(processingTime="5 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

# if you create the table metastore before any data exists then 
# the stream will result in an error as the table is generated with empty schema
def create_table_if_exists(output_path, table_name):
    data_exists = False
    # you can replace this with while, currently timeouts after about 60 seconds
    for _ in range(60):
        try:
            time.sleep(1)
            files = os.listdir(output_path)
            for file in files:
                if ".parquet" in file:
                    if len(os.listdir(f"{output_path}/_delta_log"))>0:
                        print("data exists")
                        data_exists = True
                        break
            if data_exists:
                # table metastore is created once there is some data (.parquet) in the directory
                spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{table_name}'")
                break
        except Exception as e:
            print(e)

create_table_if_exists(output_path, table_name)

data exists


If you need to run the query again, you need to stop it first.

In [29]:
# query.stop()

In [31]:
display(spark.table(table_name).limit(3))

medallion,hack_licence,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,timestamp
89D227B655E5C82AE...,BA96DE419E711691B...,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.0,-73.978165,40.757977,-73.989838,40.751171,2024-05-23 12:11:54
0BD7C8F5BA12B88E0...,9FD8F69F0804BDB55...,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.5,-74.006683,40.731781,-73.994499,40.75066,2024-05-23 12:11:54
0BD7C8F5BA12B88E0...,9FD8F69F0804BDB55...,CMT,1,N,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.1,-74.004707,40.73777,-74.009834,40.726002,2024-05-23 12:11:55


## The project starts here

You can create a

## [Query 1] Utilization over a window of 5, 10, and 15 minutes per taxi/driver. This can be computed by computing the idle time per taxi. How does it change? Is there an optimal window?

## [Query 2] The average time it takes for a taxi to find its next fare(trip) per destination borough. This can be computed by finding the time difference, e.g. in seconds, between the trip's drop off and the next trip's pick up within a given unit of time

In [None]:
# remember you can register another stream


## [Query 3] The number of trips that started and ended within the same borough in the last hour

In [None]:
# remember you can register another stream


## [Query 4] The number of trips that started in one borough and ended in another one in the last hour