5.1

In [None]:
# Install PySpark
!pip install pyspark



In [None]:
# Import required libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Create a Spark session
spark = SparkSession.builder \
    .appName("NYCTaxiDataAnalysis") \
    .getOrCreate()

# Verify Spark version
print(f"Spark Version: {spark.version}")

Spark Version: 3.5.5


In [None]:
# Download the NYC Taxi Zone Lookup dataset
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

# Download the For-Hire Vehicle trip data
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz


--2025-03-06 06:41:01--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.116.3
Connecting to github.com (github.com)|140.82.116.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250306T064101Z&X-Amz-Expires=300&X-Amz-Signature=410f1255213cc00b60f3db8d5f12edfc7d6c403a26359efb75294a9245286edf&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2025-03-06 06:41:01--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-

In [None]:
# Read the Taxi Zone Lookup CSV
zones_df = spark.read.csv('taxi_zone_lookup.csv', header=True, inferSchema=True)
zones_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [None]:
# Read the FHV trip data (compressed file)
trips_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("fhvhv_tripdata_2021-01.csv.gz")


In [None]:
# Basic exploration
print("Total number of trips:", trips_df.count())
trips_df.printSchema()
trips_df.show(25)

Total number of trips: 11908468
root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02682|2021-01-01 00:33:44|2021-01-01 00:49:07|         230|         166|   NULL|
|           HV0003|              B02682|2021-01-01 00:55:19|2021-01-01 01:18:21|         152|         167|   NULL|
|           HV0003|              B02764|2021-01-01 00:2

5.2 - Spark SQL and DataFrames

In [None]:
from pyspark.sql.functions import col, to_date, hour, count, avg, max, min, expr, unix_timestamp

# 1. Select and Filter Operations
# Select specific columns and filter trips
trips_with_duration = trips_df.withColumn(
    "trip_duration_minutes",
    (unix_timestamp(col('dropoff_datetime')) - unix_timestamp(col('pickup_datetime'))) / 60 # Corrected calculation
)

short_trips = trips_with_duration.filter(col("trip_duration_minutes") < 30)
print("Number of short trips:", short_trips.count())


# 2. Date and Time Transformations
# Include trip_duration_minutes in trips_with_details
trips_with_details = trips_with_duration.withColumn(
    "pickup_date", to_date(col("pickup_datetime"))
).withColumn(
    "pickup_hour", hour(col("pickup_datetime"))
)


# 3. Aggregate Operations
hourly_trip_stats = trips_with_details.groupBy("pickup_hour").agg(
    count("*").alias("total_trips"),
    avg("trip_duration_minutes").alias("avg_duration_minutes"),
    max("trip_duration_minutes").alias("max_duration_minutes"),
    min("trip_duration_minutes").alias("min_duration_minutes")
)

hourly_trip_stats.show()

Number of short trips: 10836690
+-----------+-----------+--------------------+--------------------+--------------------+
|pickup_hour|total_trips|avg_duration_minutes|max_duration_minutes|min_duration_minutes|
+-----------+-----------+--------------------+--------------------+--------------------+
|         12|     572737|  16.122295893810506|   577.1166666666667|0.016666666666666666|
|         22|     533375|  14.589516537769915|   666.1166666666667|                 0.0|
|          1|     267724|  14.089426361974674|               750.2|                 0.0|
|         13|     618757|  16.411899070771383|              447.25|                 0.0|
|          6|     317002|   16.04994558393957|   287.3333333333333|                0.05|
|         16|     708869|   17.63605762606823|  404.18333333333334|                 0.0|
|          3|     150020|  14.279034795360552|  251.98333333333332|0.016666666666666666|
|         20|     595136|  14.785201505762359|               211.4|           

In [None]:
# 4. License Number Analysis
license_trip_analysis = trips_df.groupBy("hvfhs_license_num") \
    .agg(
        count("*").alias("total_trips"),
        avg(expr("(unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) / 60")).alias("avg_trip_duration")
    ).orderBy("total_trips", ascending=False)

license_trip_analysis.show()

+-----------------+-----------+------------------+
|hvfhs_license_num|total_trips| avg_trip_duration|
+-----------------+-----------+------------------+
|           HV0003|    8704128|15.748788492464636|
|           HV0005|    3094325|16.219013823694745|
|           HV0004|     110015| 19.40503991879851|
+-----------------+-----------+------------------+



In [None]:
# 5. Location Analysis
location_trips = trips_df.groupBy("PULocationID", "DOLocationID") \
    .agg(
        count("*").alias("trip_count"),
    ).orderBy("trip_count", ascending=False)

print("Top 10 location pair trips:")
location_trips.show(10)


Top 10 location pair trips:
+------------+------------+----------+
|PULocationID|DOLocationID|trip_count|
+------------+------------+----------+
|          76|          76|     47637|
|          26|          26|     30920|
|          39|          39|     29897|
|          61|          61|     28851|
|          42|          42|     17379|
|          14|          14|     15723|
|          37|          37|     15345|
|          89|          89|     14994|
|         129|         129|     14720|
|         132|         265|     14656|
+------------+------------+----------+
only showing top 10 rows



In [None]:
# 6. SQL View and Query
trips_df.createOrReplaceTempView("taxi_trips")

# Busiest hours and locations
busiest_hours_sql = spark.sql("""
    SELECT
        HOUR(pickup_datetime) AS hour,
        COUNT(*) as trip_amount
    FROM taxi_trips
    GROUP BY hour
    ORDER BY trip_amount DESC
    LIMIT 5
    """)
print("\nBusiest Hours:")
busiest_hours_sql.show()



Busiest Hours:
+----+-----------+
|hour|trip_amount|
+----+-----------+
|  18|     765787|
|  17|     765136|
|  19|     711875|
|  16|     708869|
|  15|     698975|
+----+-----------+



In [None]:
# Zones Join (if you have zones_df)
if 'zones_df' in locals():
  zones_df.createOrReplaceTempView("taxi_zones")

  trips_with_zones = spark.sql("""
    SELECT
        t.hvfhs_license_num,
        z1.Zone as pickup_zone,
        z2.Zone as dropoff_zone,
        COUNT(*) as trip_count
    FROM taxi_trips t
    LEFT JOIN taxi_zones z1 ON t.PULocationID = z1.LocationID
    LEFT JOIN taxi_zones z2 ON t.DOLocationID = z2.LocationID
    GROUP BY t.hvfhs_license_num, z1.Zone, z2.Zone
    ORDER BY trip_count DESC
    LIMIT 10
    """)

print("\nTrips by Licence and Zones:")
trips_with_zones.show()


Trips by Licence and Zones:
+-----------------+--------------------+--------------------+----------+
|hvfhs_license_num|         pickup_zone|        dropoff_zone|trip_count|
+-----------------+--------------------+--------------------+----------+
|           HV0003|       East New York|       East New York|     33753|
|           HV0003|        Borough Park|        Borough Park|     25009|
|           HV0003|            Canarsie|            Canarsie|     22248|
|           HV0003| Crown Heights North| Crown Heights North|     19488|
|           HV0005|       East New York|       East New York|     13806|
|           HV0003|           Bay Ridge|           Bay Ridge|     12557|
|           HV0003|Central Harlem North|Central Harlem North|     12363|
|           HV0003|     Jackson Heights|     Jackson Heights|     11489|
|           HV0003|Flatbush/Ditmas Park|Flatbush/Ditmas Park|     10747|
|           HV0003|         JFK Airport|                  NA|     10345|
+-----------------+---