In [5]:
import findspark
findspark.init('/spark')

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType


spark = SparkSession.builder.master("local[*]").appName("MySparkApp").getOrCreate()
print(spark.version)


3.5.1


In [6]:
spark = SparkSession.builder.appName("FHVDataProcessing").getOrCreate()


24/03/01 17:05:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [11]:
# Define the schema
schema = StructType([
    StructField("dispatching_base_num", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropOff_datetime", TimestampType(), True),
    StructField("PUlocationID", IntegerType(), True),
    StructField("DOlocationID", IntegerType(), True),
    StructField("SR_Flag", IntegerType(), True),
    StructField("Affiliated_base_number", StringType(), True)
])

In [13]:
df = spark.read.csv('fhv_tripdata_2019-10.csv.gz', header=True, schema=schema)
df

DataFrame[dispatching_base_num: string, pickup_datetime: timestamp, dropOff_datetime: timestamp, PUlocationID: int, DOlocationID: int, SR_Flag: int, Affiliated_base_number: string]

In [14]:
df_repartitioned = df.repartition(6)

In [16]:
# Q2:
df_repartitioned.write.parquet('fhv_tripdata_2019-10_partitioned.parquet')

                                                                                

In [18]:
#Question 3:

#Count records

#How many taxi trips were there on the 15th of October?
from pyspark.sql.functions import col, dayofmonth, month, year

df_filtered = df.filter(
    (dayofmonth(col("pickup_datetime")) == 15) &
    (month(col("pickup_datetime")) == 10) &
    (year(col("pickup_datetime")) == 2019)
)

trip_count = df_filtered.count()
print(trip_count)

                                                                                

In [19]:
trip_count


62610

In [21]:
df_filtered.show()

                                                                                

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-15 00:08:00|2019-10-15 00:13:00|         264|         264|   NULL|                B00009|
|              B00009|2019-10-15 00:13:00|2019-10-15 00:41:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-15 00:57:07|2019-10-15 00:57:49|         264|         264|   NULL|                B00013|
|              B00013|2019-10-15 00:57:54|2019-10-15 00:58:36|         264|         264|   NULL|                B00013|
|              B00013|2019-10-15 00:58:50|2019-10-15 00:59:52|         264|         264|   NULL|                B00013|
|              B00013|2019-10-15 00:03:0

In [27]:
#Question 4:

#Longest trip for each day

#What is the length of the longest trip in the dataset in hours?

from pyspark.sql.functions import col, dayofyear, max, hour, datediff, to_date

df_enriched = df.withColumn("trip_duration_hours", (col("dropOff_datetime").cast("long") - col("pickup_datetime").cast("long")) / 3600)

# Group by the day of the year and find the maximum trip duration for each day
df_grouped = df_enriched.groupBy(dayofyear("pickup_datetime").alias("day_of_year")).agg(max("trip_duration_hours").alias("longest_trip_hours"))

# Show the results
df_grouped.orderBy(col("longest_trip_hours").desc()).show()

[Stage 17:>                                                         (0 + 1) / 1]

+-----------+------------------+
|day_of_year|longest_trip_hours|
+-----------+------------------+
|        301|          631152.5|
|        284|          631152.5|
|        304| 87672.44083333333|
|        274| 70128.02805555555|
|        290|            8794.0|
|        299| 8784.166666666666|
|        303|1464.5344444444445|
|        298|1056.8266666666666|
|        275| 769.2313888888889|
|        296| 745.6166666666667|
|        276|          745.3825|
|        277| 744.6166666666667|
|        280| 744.1666666666666|
|        278| 697.1808333333333|
|        279| 674.0077777777777|
|        281| 625.0822222222222|
|        289| 604.0666666666667|
|        282| 601.3102777777777|
|        283| 577.3888888888889|
|        285|          528.9125|
+-----------+------------------+
only showing top 20 rows



                                                                                

In [28]:
zone_schema = StructType([
    StructField("LocationID", IntegerType(), True),
    StructField("Borough", StringType(), True),
    StructField("Zone", StringType(), True),
    StructField("service_zone", StringType(), True)
])

zone_df = spark.read.csv('taxi_zone_lookup.csv', header=True, schema=zone_schema)


In [31]:
# Join the FHV trip data with the taxi zone lookup data
joined_df = df.join(zone_df, df.PUlocationID == zone_df.LocationID)

# Group by the Zone and count the occurrences
zone_counts = joined_df.groupBy("Zone").count()

# Find the least frequent pickup location zone
least_freq_zone = zone_counts.orderBy("count").first()

print(f"The least frequent pickup location zone is: {least_freq_zone['Zone']}")

[Stage 21:>                                                         (0 + 1) / 1]

The least frequent pickup location zone is: Jamaica Bay


                                                                                

In [36]:
zone_counts.orderBy(col("count")).show()

[Stage 37:>                                                         (0 + 1) / 1]

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
|        Battery Park|   15|
|Saint Michaels Ce...|   23|
|Breezy Point/Fort...|   25|
|Marine Park/Floyd...|   26|
|        Astoria Park|   29|
|    Inwood Hill Park|   39|
|       Willets Point|   47|
|Forest Park/Highl...|   53|
|  Brooklyn Navy Yard|   57|
|        Crotona Park|   62|
|        Country Club|   77|
|     Freshkills Park|   89|
|       Prospect Park|   98|
|     Columbia Street|  105|
|  South Williamsburg|  110|
+--------------------+-----+
only showing top 20 rows



                                                                                