In [1]:
import pyspark
from pyspark.sql import SparkSession

!spark-shell --version

# Create SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('test-spark') \
                    .getOrCreate()

print(f'The PySpark {spark.version} version is running...')

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.4
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.26
Branch HEAD
Compiled by user yangjie01 on 2024-12-17T04:51:46Z
Revision a6f220d951742f4074b37772485ee0ec7a774e7d
Url https://github.com/apache/spark
Type --help for more information.


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/09 23:04:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


The PySpark 3.5.4 version is running...


In [2]:
import urllib.request

url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet"
file_name = "yellow_tripdata_2024-10.parquet"

urllib.request.urlretrieve(url, file_name)

print("File downloaded successfully.")


File downloaded successfully.


In [3]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Yellow Trip Data") \
    .getOrCreate()

25/03/10 23:38:50 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# Load the Parquet file into a DataFrame
df = spark.read.parquet("yellow_tripdata_2024-10.parquet")
# Repartition the DataFrame into 4 partitions
df_repartitioned = df.repartition(4)
# Save the DataFrame to Parquet
df_repartitioned.write.parquet("yellow_tripdata_repartitioned.parquet")


                                                                                

In [9]:
import os

# Define the path where your Parquet files are saved
parquet_dir = "yellow_tripdata_repartitioned.parquet"

# List all files in the directory, excluding auxiliary files (crc and _SUCCESS)
files = [f for f in os.listdir(parquet_dir) 
         if os.path.isfile(os.path.join(parquet_dir, f)) and f.endswith('.parquet')]

# Check if any Parquet files were found
if not files:
    print("No Parquet files found in the directory.")
else:
    # Get the total size of all Parquet files in the directory
    folder_size = sum(os.path.getsize(os.path.join(parquet_dir, f)) for f in files)
    
    # Convert size to MB
    folder_size_mb = folder_size / (1024 * 1024)
    
    # Calculate the average file size
    average_file_size_mb = folder_size_mb / len(files)
    
    print(f"Average size of Parquet files: {average_file_size_mb:.2f} MB")

Average size of Parquet files: 23.79 MB


In [11]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [17]:
from pyspark.sql.functions import col, to_date
df = spark.read.parquet("yellow_tripdata_2024-10.parquet")
df = df.withColumn("pickup_date", to_date(col("tpep_pickup_datetime")))

oct_15th_trips = df.filter(col("pickup_date") == "2024-10-15")

num_trips_oct_15 = oct_15th_trips.count()

print(f"Number of taxi trips on the 15th of October: {num_trips_oct_15}")

Number of taxi trips on the 15th of October: 128893


In [18]:
from pyspark.sql.functions import col, unix_timestamp
df = spark.read.parquet("yellow_tripdata_2024-10.parquet")
df = df.withColumn("trip_duration_seconds", 
                  unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime"))

df = df.withColumn("trip_duration_hours", col("trip_duration_seconds") / 3600)

longest_trip_duration = df.agg({"trip_duration_hours": "max"}).collect()[0][0]

print(f"The longest trip duration is {longest_trip_duration:.2f} hours.")


The longest trip duration is 162.62 hours.


In [20]:
url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"
zone_lookup_file = "taxi_zone_lookup.csv"

urllib.request.urlretrieve(url, zone_lookup_file)

print("File downloaded succedd")

File downloaded succedd


In [21]:
# Load the Yellow Trip Data (Parquet)
yellow_trip_df = spark.read.parquet("yellow_tripdata_2024-10.parquet")

# Register Yellow Trip Data as a temp view
yellow_trip_df.createOrReplaceTempView("yellow_trip_data")


zone_lookup_df = spark.read.option("header", "true").csv(zone_lookup_file)
zone_lookup_df.createOrReplaceTempView("zone_lookup")


yellow_trip_df.printSchema()
zone_lookup_df.printSchema()


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-

In [22]:
from pyspark.sql.functions import col

joined_df = spark.sql("""
    SELECT 
        z.Zone AS pickup_zone, 
        COUNT(*) AS pickup_count
    FROM 
        yellow_trip_data y
    JOIN 
        zone_lookup z
    ON 
        y.PULocationID = z.LocationID
    GROUP BY 
        z.Zone
    ORDER BY 
        pickup_count ASC
    LIMIT 1
""")

joined_df.show()

+--------------------+------------+
|         pickup_zone|pickup_count|
+--------------------+------------+
|Governor's Island...|           1|
+--------------------+------------+



                                                                                