# Task 3

In [1]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, IntegerType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, window, when, unix_timestamp, format_string
from pyspark.sql import functions as f
import pyspark.sql.functions as F

inputPath = "file:///root/lab3/taxi-data"
outputPath = "file:///root/lab3/output"

# Initialize Spark Session
spark = SparkSession.builder.master("local")\
    .appName("Introduction to Spark")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")
spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false")

24/05/21 15:45:11 WARN Utils: Your hostname, DESKTOP-J1VUCON resolves to a loopback address: 127.0.1.1; using 172.30.215.130 instead (on interface eth0)
24/05/21 15:45:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/21 15:45:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Define the schema for yellow taxi rides
schema_yellow = StructType() \
    .add("type", StringType()) \
    .add("VendorID", StringType()) \
    .add("tpep_pickup_datetime", TimestampType()) \
    .add("tpep_dropoff_datetime", TimestampType()) \
    .add("passenger_count", StringType()) \
    .add("trip_distance", StringType()) \
    .add("pickup_longitude", DoubleType()) \
    .add("pickup_latitude", DoubleType()) \
    .add("RatecodeID", StringType()) \
    .add("store_and_fwd_flag", StringType()) \
    .add("dropoff_longitude", DoubleType()) \
    .add("dropoff_latitude", DoubleType()) \
    .add("payment_type", StringType()) \
    .add("fare_amount", StringType()) \
    .add("extra", StringType()) \
    .add("mta_tax", StringType()) \
    .add("tip_amount", StringType()) \
    .add("tolls_amount", StringType()) \
    .add("improvement_surcharge", StringType()) \
    .add("total_amount", StringType())

# Define the schema for green taxi rides
schema_green = StructType() \
    .add("type", StringType()) \
    .add("VendorID", StringType()) \
    .add("lpep_pickup_datetime", TimestampType()) \
    .add("lpep_dropoff_datetime", TimestampType()) \
    .add("store_and_fwd_flag", StringType()) \
    .add("RatecodeID", StringType()) \
    .add("pickup_longitude", DoubleType()) \
    .add("pickup_latitude", DoubleType()) \
    .add("dropoff_longitude", DoubleType()) \
    .add("dropoff_latitude", DoubleType()) \
    .add("passenger_count", StringType()) \
    .add("trip_distance", StringType()) \
    .add("fare_amount", StringType()) \
    .add("extra", StringType()) \
    .add("mta_tax", StringType()) \
    .add("tip_amount", StringType()) \
    .add("tolls_amount", StringType()) \
    .add("ehail_fee", StringType()) \
    .add("improvement_surcharge", StringType()) \
    .add("total_amount", StringType()) \
    .add("payment_type", StringType()) \
    .add("trip_type", StringType())

default_schema = StructType([
    StructField("_c0", StringType(), True),
    StructField("_c1", StringType(), True),
    StructField("_c2", StringType(), True),
    StructField("_c3", StringType(), True),
    StructField("_c4", StringType(), True),
    StructField("_c5", StringType(), True),
    StructField("_c6", StringType(), True),
    StructField("_c7", StringType(), True),
    StructField("_c8", StringType(), True),
    StructField("_c9", StringType(), True),
    StructField("_c10", StringType(), True),
    StructField("_c11", StringType(), True),
    StructField("_c12", StringType(), True),
    StructField("_c13", StringType(), True),
    StructField("_c14", StringType(), True),
    StructField("_c15", StringType(), True),
    StructField("_c16", StringType(), True),
    StructField("_c17", StringType(), True),
    StructField("_c18", StringType(), True),
    StructField("_c19", StringType(), True),
    StructField("_c20", StringType(), True),
    StructField("_c21", StringType(), True)
])

In [3]:
default_df = (
    spark.readStream
    .format("csv")
    .schema(default_schema)
    .option("header", "false")
    .load(inputPath)
)

In [4]:
# Filter and apply schema for yellow taxi trips
yellow_trips = default_df.filter(col("_c0") == "yellow") \
                           .drop("_c20","_c21") \
                           .toDF(*schema_yellow.names)

# Convert the columns as per the new schema
yellow_df = yellow_trips.withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast(TimestampType())) \
    .withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast(TimestampType())) \
    .withColumn("pickup_longitude", col("pickup_longitude").cast(DoubleType())) \
    .withColumn("pickup_latitude", col("pickup_latitude").cast(DoubleType())) \
    .withColumn("dropoff_longitude", col("dropoff_longitude").cast(DoubleType())) \
    .withColumn("dropoff_latitude", col("dropoff_latitude").cast(DoubleType()))

yellow_df.printSchema()

root
 |-- type: string (nullable = true)
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)



In [5]:
# Filter and apply schema for green taxi trips
green_trips = default_df.filter(col("_c0") == "green") \
                           .toDF(*schema_green.names)

# Convert the columns as per the new schema
green_df = green_trips.withColumn("lpep_pickup_datetime", col("lpep_pickup_datetime").cast(TimestampType())) \
    .withColumn("lpep_dropoff_datetime", col("lpep_dropoff_datetime").cast(TimestampType())) \
    .withColumn("pickup_longitude", col("pickup_longitude").cast(DoubleType())) \
    .withColumn("pickup_latitude", col("pickup_latitude").cast(DoubleType())) \
    .withColumn("dropoff_longitude", col("dropoff_longitude").cast(DoubleType())) \
    .withColumn("dropoff_latitude", col("dropoff_latitude").cast(DoubleType()))

green_df.printSchema()

root
 |-- type: string (nullable = true)
 |-- VendorID: string (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)



In [6]:
# Bounding box coordinates for Goldman Sachs and Citigroup headquarters
goldman_coords = [
    (-74.0141012, 40.7152191),
    (-74.013777, 40.7152275),
    (-74.0141027, 40.7138745),
    (-74.0144185, 40.7140753)
]

citigroup_coords = [
    (-74.011869, 40.7217236),
    (-74.009867, 40.721493),
    (-74.010140, 40.720053),
    (-74.012083, 40.720267)
]

In [7]:
from pyspark.sql.functions import udf
from shapely.geometry import Point, Polygon
from pyspark.sql.types import BooleanType

# Create polygons from coordinates
goldman_polygon = Polygon(goldman_coords)
citigroup_polygon = Polygon(citigroup_coords)

# Define a UDF to check if a point is inside the Goldman Sachs or Citigroup polygon
def inside_polygon(longitude, latitude, polygon):
    point = Point(longitude, latitude)
    return point.within(polygon)

# Register UDFs for both locations
inside_goldman = udf(lambda lon, lat: inside_polygon(lon, lat, goldman_polygon), BooleanType())
inside_citigroup = udf(lambda lon, lat: inside_polygon(lon, lat, citigroup_polygon), BooleanType())

# Adding a column to indicate the drop-off location
def get_dropoff_location(longitude, latitude):
    point = Point(longitude, latitude)
    if point.within(goldman_polygon):
        return "Goldman Sachs"
    elif point.within(citigroup_polygon):
        return "Citigroup"
    else:
        return None

get_dropoff_location_udf = udf(get_dropoff_location, StringType())

# Filter dataframe for trips that end within the specified bounding boxes
yellow_trips = yellow_df.filter(
    inside_goldman(col("dropoff_longitude"), col("dropoff_latitude")) | 
    inside_citigroup(col("dropoff_longitude"), col("dropoff_latitude"))
)

yellow_trips = yellow_trips.withColumn("dropoff_location", get_dropoff_location_udf(col("dropoff_longitude"), col("dropoff_latitude")))

green_trips = green_df.filter(
    inside_goldman(col("dropoff_longitude"), col("dropoff_latitude")) | 
    inside_citigroup(col("dropoff_longitude"), col("dropoff_latitude"))
)

green_trips = green_trips.withColumn("dropoff_location", get_dropoff_location_udf(col("dropoff_longitude"), col("dropoff_latitude")))

In [8]:
# Assuming dropoff_datetime is the column based on which windowed aggregation is done
yellow_agg_df = (
    yellow_trips
        .groupBy(
            yellow_trips.dropoff_location,
            window(col("tpep_dropoff_datetime"), "1 hour"),
        )
        .count()
        .orderBy("window")
)

In [9]:
# Assuming dropoff_datetime is the column based on which windowed aggregation is done
green_agg_df = (
    green_trips
        .groupBy(
            green_trips.dropoff_location,
            window(col("lpep_dropoff_datetime"), "1 hour"),
        )
        .count()
        .orderBy("window")
)

In [10]:
# Cast the 'count' column to integer if necessary
yellow_agg_df = yellow_agg_df.withColumn("count", when(col("count").cast("integer").isNull(), 0).otherwise(col("count").cast("integer")))
green_agg_df = green_agg_df.withColumn("count", when(col("count").cast("integer").isNull(), 0).otherwise(col("count").cast("integer")))

In [11]:
combined_df = yellow_agg_df.union(green_agg_df).orderBy("window")
final_agg_df = combined_df.groupBy("dropoff_location", "window").sum("count")
final_agg_df = final_agg_df.withColumnRenamed("sum(count)", "count")

# Format the path string to include the timestamp as milliseconds
final_agg_df = final_agg_df.withColumn("path", format_string("output-%d", (f.hour(col("window.start")) + 1) * 360000))

In [12]:
def write_to_files(batch_df, epoch_id):
    def write_partition(partition):
        import os
        for row in partition:
            path = f"output/{row['path']}"
            os.makedirs(path, exist_ok=True)
            with open(os.path.join(path, f"part-{epoch_id}.txt"), 'a') as file:
                file.write(f"({row['dropoff_location']}, {row['count']})\n")

    batch_df.foreachPartition(write_partition)

In [13]:
query = final_agg_df.writeStream\
    .outputMode("complete")\
    .foreachBatch(write_to_files)\
    .option("checkpointLocation", "file:///root/checkpoint")\
    .start()

query.awaitTermination()

24/05/20 22:47:06 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/05/20 22:47:06 WARN UnsupportedOperationChecker: Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded. Please refer the programming guide doc for more details. If you understand the possible risk of correctness issue and still need to run the query, you can disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.;
Project [dropoff_location#416, window#465, count#542L, format_string(output-%d, ((hour(window#465.start, Some(Asia/Bangkok)) + 1) * 360000)) AS path#546]
+- Project [dropoff_location#416, window#465, sum(count)#538L AS count#542L]
   +- Aggregate 

KeyboardInterrupt: 