In [1]:
from pyspark.sql import SparkSession



In [2]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("YellowTaxiDataProcessing") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.gs.auth.service.account.enable", "true") \
    .getOrCreate()



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/06 20:36:45 INFO SparkEnv: Registering MapOutputTracker
25/03/06 20:36:45 INFO SparkEnv: Registering BlockManagerMaster
25/03/06 20:36:45 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/03/06 20:36:45 INFO SparkEnv: Registering OutputCommitCoordinator


In [3]:
# Define input and output paths
input_path = "gs://de-arjunr/notebooks/jupyter/yellow_tripdata_2024-10.parquet"
output_path = "gs://de-arjunr/notebooks/jupyter/op/op.parquet"





In [4]:
# Read the Parquet file
df = spark.read.parquet(input_path)



                                                                                

In [15]:
from pyspark.sql.functions import col, to_date, unix_timestamp

filtered_df = df.withColumn("pickup_date", to_date(col("tpep_pickup_datetime")))
trips_oct_15 = filtered_df.filter(col("pickup_date") == "2024-10-15").count()

print(f"Number of taxi trips on October 15th: {trips_oct_15}")


Number of taxi trips on October 15th: 128893


In [16]:
# Calculate trip duration in hours
duration_df = df.withColumn("trip_duration_hours", 
                            (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 3600)

# Find the longest trip
longest_trip = duration_df.agg({'trip_duration_hours': 'max'}).collect()[0][0]

print(f"Longest trip duration in hours: {longest_trip}")

[Stage 21:>                                                         (0 + 2) / 2]

Longest trip duration in hours: 162.61777777777777


                                                                                

In [17]:
zone_lookup_path = "gs://de-arjunr/notebooks/jupyter/taxi_zone_lookup.csv"

In [18]:
zone_df = spark.read.option("header", "true").csv(zone_lookup_path)

# Aggregate trip counts by pickup location
pickup_counts = df.groupBy("PULocationID").count()

# Find the least frequent pickup location
least_frequent_pickup = pickup_counts.orderBy("count").limit(1)

# Join with zone lookup to get the zone name
least_frequent_zone = least_frequent_pickup.join(zone_df, pickup_counts.PULocationID == zone_df.LocationID, "left")

# Get the zone name
zone_name = least_frequent_zone.select("Zone").collect()[0][0]

print(f"Least frequent pickup location zone: {zone_name}")

[Stage 26:>                                                         (0 + 2) / 2]

Least frequent pickup location zone: Governor's Island/Ellis Island/Liberty Island


                                                                                