In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TaxiDataProcessing") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

In [4]:
print(spark.version)

3.5.4


In [5]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-02 00:10:03--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.154.99.47, 18.154.99.220, 18.154.99.225, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.154.99.47|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-02 00:10:04 (166 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [6]:
df = spark.read.parquet("yellow_tripdata_2024-10.parquet")

In [None]:
df_repartitioned = df.repartition(4)

In [17]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [7]:
from pyspark.sql.functions import col, to_date

# Assuming the column with the trip start time is called 'tpep_pickup_datetime'
# Convert tpep_pickup_datetime to a date format and filter for October 15th, 2024
oct_15_trips = df.filter(to_date(col('tpep_pickup_datetime')) == '2024-10-15')

# Count the number of records
oct_15_count = oct_15_trips.count()

print(f"Number of taxi trips on 15th October 2024: {oct_15_count}")


Number of taxi trips on 15th October 2024: 128893


In [8]:
from pyspark.sql import functions as F

# Calculate the duration in seconds
df_with_duration = df.withColumn("trip_duration_seconds",
                                (F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime')))

# Convert seconds to hours
df_with_duration = df_with_duration.withColumn("trip_duration_hours", df_with_duration["trip_duration_seconds"] / 3600)

# Find the maximum trip duration in hours
max_trip_duration = df_with_duration.agg(F.max("trip_duration_hours")).collect()[0][0]

print(f"The length of the longest trip in hours is: {max_trip_duration}")


The length of the longest trip in hours is: 162.61777777777777


In [9]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-02 00:11:09--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 54.230.209.140, 54.230.209.126, 54.230.209.72, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|54.230.209.140|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-02 00:11:09 (255 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [10]:
# Load the zone lookup data into a Spark DataFrame
zone_df = spark.read.option("header", "true").csv("taxi_zone_lookup.csv")

# Create a temporary view for zone lookup
zone_df.createOrReplaceTempView("zone_lookup")

In [11]:
# Join Yellow October 2024 DataFrame with the zone lookup DataFrame
joined_df = df.join(zone_df, df.PULocationID == zone_df.LocationID, "inner")

# Group by the zone name and count the frequency of each zone
zone_counts = joined_df.groupBy("Zone").count()

# Find the zone with the least frequency (lowest count)
least_frequent_zone = zone_counts.orderBy("count").first()

# Show the least frequent pickup location zone
least_frequent_zone_name = least_frequent_zone["Zone"]
print(f"The least frequent pickup location zone is: {least_frequent_zone_name}")

The least frequent pickup location zone is: Governor's Island/Ellis Island/Liberty Island


In [12]:
from pyspark.sql import SparkSession

# Stop any existing Spark session
SparkSession.builder.getOrCreate().stop()
