In [3]:
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, desc, hour

# Start Spark
spark = SparkSession.builder.appName("Big Data Analysis - NYC Taxi Trips").getOrCreate()

# 1. Download the real dataset (Parquet format)
!wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet -O taxi_data.parquet

# 2. Load into Spark
df = spark.read.parquet("taxi_data.parquet")

print("Schema of dataset:")
df.printSchema()

print("First 5 rows:")
df.show(5)

print(f"Total records: {df.count()}")

# 3. Data cleaning
df_clean = df.dropna(subset=["passenger_count", "trip_distance", "total_amount", "tpep_pickup_datetime", "tpep_dropoff_datetime"])
df_clean = df_clean.filter((col("trip_distance") > 0) & (col("total_amount") > 0))

# 4. Analysis Examples
top_passenger_counts = df_clean.groupBy("passenger_count").count().orderBy(desc("count"))
print("Top Passenger Counts:")
top_passenger_counts.show(10)

avg_distance = df_clean.agg(avg("trip_distance").alias("Average_Distance_Miles"))
print("Average Trip Distance:")
avg_distance.show()

df_with_hour = df_clean.withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))
peak_hours = df_with_hour.groupBy("pickup_hour").count().orderBy(desc("count"))
print("Peak Pickup Hours:")
peak_hours.show()

avg_fare_by_passenger = df_clean.groupBy("passenger_count").agg(avg("total_amount").alias("Avg_Fare")).orderBy("passenger_count")
print("Average Fare per Passenger Count:")
avg_fare_by_passenger.show()

# 5. Save results
output_path = "/content/big_data_analysis_results"
top_passenger_counts.write.csv(output_path + "_passenger_counts", header=True, mode="overwrite")
avg_distance.write.csv(output_path + "_avg_distance", header=True, mode="overwrite")
peak_hours.write.csv(output_path + "_peak_hours", header=True, mode="overwrite")
avg_fare_by_passenger.write.csv(output_path + "_avg_fare", header=True, mode="overwrite")

print(f"Results saved in {output_path}...")


Schema of dataset:
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

First 5 rows:
+--------+--------------------+---------------------+---------------+-------------+----------+---