In [None]:

# NYC Taxi Trip Data Analysis using PySpark

# 1. Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, avg, count, when

# 2. Initialize Spark Session
spark = SparkSession.builder \

    .appName("NYC Taxi Big Data Analysis") \

    .getOrCreate()

# 3. Load Dataset
# Replace the path below with your actual CSV path or cloud storage link
df = spark.read.csv("yellow_tripdata_2023-01.csv", header=True, inferSchema=True)

# 4. Display Sample Data
print("Sample Records:")
df.show(5)
df.printSchema()

# 5. Data Cleaning
df_clean = df.dropna(subset=[
    "tpep_pickup_datetime", "tpep_dropoff_datetime", 
    "passenger_count", "trip_distance", "total_amount"
])

df_clean = df_clean.filter(
    (col("passenger_count") > 0) &
    (col("trip_distance") > 0) &
    (col("total_amount") > 0)
)

# 6. Feature Engineering
df_clean = df_clean.withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))

# 7. Analysis 1: Peak Hours for Pickup
print("Most Common Pickup Hours:")
peak_hours = df_clean.groupBy("pickup_hour") \

    .agg(count("*").alias("trip_count")) \

    .orderBy("trip_count", ascending=False)
peak_hours.show()

# 8. Analysis 2: Average Fare Based on Distance Category
df_clean = df_clean.withColumn("distance_range", 
    when(col("trip_distance") <= 2, "Short") \

    .when((col("trip_distance") > 2) & (col("trip_distance") <= 5), "Medium") \

    .otherwise("Long")
)

print("Average Fare by Trip Distance Category:")
avg_fare = df_clean.groupBy("distance_range") \

    .agg(avg("total_amount").alias("avg_fare")) \

    .orderBy("distance_range")
avg_fare.show()

# 9. Analysis 3: Total Trips by Passenger Count
print("Trip Count by Number of Passengers:")
trip_by_passengers = df_clean.groupBy("passenger_count") \

    .agg(count("*").alias("trip_count")) \

    .orderBy("trip_count", ascending=False)
trip_by_passengers.show()

# 10. Stop Spark Session
spark.stop()
