## Project Template

In [1]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars.packages", 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0') \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()


## Config for Delta

The next two cells are likely not needed as the setup of the project's template seems to indicate that we don't need to use Delta Lake(s). Then again, Idk what I'm on about. -Karl

In [1]:
!pip install delta-spark python-dotenv

Collecting delta-spark
  Downloading delta_spark-3.2.0-py3-none-any.whl.metadata (2.0 kB)
Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Collecting py4j==0.10.9.7 (from pyspark<3.6.0,>=3.5.0->delta-spark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading delta_spark-3.2.0-py3-none-any.whl (21 kB)
Downloading python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: py4j, python-dotenv, delta-spark
Successfully installed delta-spark-3.2.0 py4j-0.10.9.7 python-dotenv-1.0.1


In [1]:
from pyspark.sql.functions import explode
import pyspark.sql.functions as F

from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
from delta import *
from delta.tables import *
from dotenv import load_dotenv

builder = SparkSession.builder \
    .appName("Project 2 - Streaming") \
    .config("spark.sql.warehouse.dir", "data/out/table")\
    .config("spark.jars.packages", 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0') \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

load_dotenv()

False

## Template continues

Be sure to start the stream on Kafka!

In [15]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType, DateType
# TO MODIFY FOR YOUR SCHEMA
# Modified schema
schema = StructType([
    StructField("medallion", StringType(), False),
    StructField("hack_license", StringType(), False),
    StructField("pickup_datetime", TimestampType(), False),
    StructField("dropoff_datetime", TimestampType(), False),
    StructField("pickup_longitude", DoubleType(), False),
    StructField("pickup_latitude", DoubleType(), False),
    StructField("dropoff_longitude", DoubleType(), False),
    StructField("dropoff_latitude", DoubleType(), False),
    StructField("timestamp", TimestampType(), False)
])

In [35]:
# I added both ports because some StackOverflow threads recommended it as a fix, did not seem to affect much
#   -Karl
kafka_server = "kafka1:9092,kafka2:9093"
from pyspark.sql.functions import from_json

lines = (spark.readStream                          # Get the DataStreamReader
  .format("kafka")                                 # Specify the source format as "kafka"
  .option("kafka.bootstrap.servers", kafka_server) # Configure the Kafka server name and port
  .option("subscribe", "taxi_rides")               # Subscribe to the "taxi_rides" Kafka topic 
  .option("startingOffsets", "earliest")           # The start point when a query is started
  .option("maxOffsetsPerTrigger", 100)             # Rate limit on max offsets per trigger interval
  .load()                                          # Load the DataFrame
)

# Convert the value column to string
value_df = lines.selectExpr("CAST(value AS STRING)")

# Parse JSON and create DataFrame
parsed_df = value_df.select(from_json(col("value"), schema).alias("parsed_value"))

# Select the parsed fields
df = parsed_df.select("parsed_value.*")

df.printSchema()  # To check if the schema is correctly applied

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



## The project starts here

You can create a

## [Query 1] Utilization over a window of 5, 10, and 15 minutes per taxi/driver. This can be computed by computing the idle time per taxi. How does it change? Is there an optimal window?

In [36]:
# Imports
import pyspark.sql.functions as F
import time

from pyspark.sql import Window
from pyspark.sql.functions import lag, unix_timestamp, sum, col, udf, date_format
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, LongType

In [37]:
# Create UDF for finding idle time between two times
def idle_time_ms(start_ms, prev_end_ms):
    if start_ms is None or prev_end_ms is None:
        return 0
    idle_ms = start_ms - prev_end_ms
    threshold_duration_ms = 4 * 60 * 60 * 1000  # 4 hours in milliseconds
    if idle_ms < 0 or idle_ms > threshold_duration_ms:
        return 0
    else:
        return idle_ms
    
# Define as UDF
idle_time_ms_udf = udf(idle_time_ms, LongType())

# Create method for creating table - taken from 06_Streaming2/Streaming2.ipynb practice materials
# NB! Seems to be related to Delta Lakes, likely not needed
def create_table_if_exists(output_path,table_name):
    data_exists = False
    for _i in range(60): # you can replace this with while, currently timeouts after about 60 seconds
        try:
            time.sleep(1)
            files = os.listdir(output_path)
            for _f in files:
                if ".parquet" in _f:
                    if len(os.listdir(f"{output_path}/_delta_log"))>0:
                        print("data exists")
                        data_exists = True
                        break
            if data_exists:
                spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{table_name}'") # table metastore is created once there is some data (.parquet) in the directory
                break
        except Exception as e:
            #print(e) # if you want to see the exceptions, uncomment this
            pass

In [48]:
# These variables would be used if following the Delta Lake examples
# But they can also be repurposed for other uses so it's OK to keep them
table_name = "query1"
checkpoint_path = f"streaming/{table_name}/_checkpoint" 
output_path = f"spark-warehouse/{table_name}"

# Set up window conf for comparing with previous fares
window_conf = Window.partitionBy(col("medallion")).orderBy(col("pickup_datetime"))

# Unsure if this bit is necessary, but might be useful for when using .awaitTermination()
# See more here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryManager.awaitAnyTermination.html#pyspark.sql.streaming.StreamingQueryManager.awaitAnyTermination
print("Resetting terminated streams.. ", end="")
spark.streams.resetTerminated()
print("done!")

# All the commented out lines are different experimentations I have tried for reading and/or displaying data from Kafka
# The current uncommented lines are the closest I got to getting at least some sort of result from Kafka
#   -Karl
query = (df
           #.withColumn("dropoff_datetime_prev", lag(col("dropoff_datetime"), default=datetime.min).over(window_conf))
           #.withColumn("pickup_ts_ms", unix_timestamp("pickup_datetime") * 1000)
           #.withColumn("dropoff_prev_ts_ms", unix_timestamp("dropoff_datetime_prev") * 1000)
           #.withColumn("idle_time_ms", idle_time_ms_udf("pickup_ts_ms", "dropoff_prev_ts_ms"))
           #.groupBy(col("medallion")).agg(sum(col("idle_time_ms")))
           #.withWatermark(col("pickup_datetime"), "2 hours")
           #.withWatermark(col("timestamp"), "2 hours")
           .select(col("timestamp"))
           #.groupBy(col("medallion")).agg(sum(col("timestamp")))
           .writeStream
           .outputMode("append")
           #.outputMode("complete") # we overwrite the complete table with every trigger
           #.format("delta")
           .format("console") # This should make the output appear in the console
           #.queryName(table_name)
           #.trigger(processingTime="5 second")
           #.option("checkpointLocation", checkpoint_path)
           #.start(output_path)
           .start()
)

# Debugging for seeing what the status of the query is
print("List of active streams: " + str(spark.streams.active))
n = 0
while n < 1000:
    print(query.status)
    if "Terminated" in query.status.get("message"):
        break
    n += 1
    time.sleep(1)

# Helps with debugging sometimes
query.awaitTermination()

Resetting terminated streams.. done!
List of active streams: [<pyspark.sql.streaming.query.StreamingQuery object at 0x7f344aab3ed0>]
{'message': 'Getting offsets from KafkaV2[Subscribe[taxi_rides]]', 'isDataAvailable': False, 'isTriggerActive': True}
{'message': 'Getting offsets from KafkaV2[Subscribe[taxi_rides]]', 'isDataAvailable': False, 'isTriggerActive': True}
{'message': 'Getting offsets from KafkaV2[Subscribe[taxi_rides]]', 'isDataAvailable': False, 'isTriggerActive': True}
{'message': 'Getting offsets from KafkaV2[Subscribe[taxi_rides]]', 'isDataAvailable': False, 'isTriggerActive': True}
{'message': 'Getting offsets from KafkaV2[Subscribe[taxi_rides]]', 'isDataAvailable': False, 'isTriggerActive': True}
{'message': 'Getting offsets from KafkaV2[Subscribe[taxi_rides]]', 'isDataAvailable': False, 'isTriggerActive': True}
{'message': 'Getting offsets from KafkaV2[Subscribe[taxi_rides]]', 'isDataAvailable': False, 'isTriggerActive': True}
{'message': 'Getting offsets from KafkaV2

StreamingQueryException: [STREAM_FAILED] Query [id = ae03cc79-55eb-4807-9416-f5bb37356391, runId = 36353140-f855-42a5-8717-ea4d0777c690] terminated with exception: Failed to construct kafka consumer

## [Query 2] The average time it takes for a taxi to find its next fare(trip) per destination borough. This can be computed by finding the time difference, e.g. in seconds, between the trip's drop off and the next trip's pick up within a given unit of time

In [None]:
# remember you can register another stream


## [Query 3] The number of trips that started and ended within the same borough in the last hour

In [None]:
# remember you can register another stream


## [Query 4] The number of trips that started in one borough and ended in another one in the last hour