In [20]:
# STEP 1: Install PySpark (if running in Colab)
!pip install pyspark

# STEP 2: Import Required Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour, avg, count, sum, to_date, round

# STEP 3: Start Spark Session
spark = SparkSession.builder.appName("Big Data Analysis - Internship Task").getOrCreate()

# STEP 4: Load Dataset (Update with your actual file path in Colab/Drive)
data_path = "/content/yellow_tripdata_2015-01.csv"  # Change if needed
df = spark.read.csv(data_path, header=True, inferSchema=True)

# STEP 5: Basic Overview
df.printSchema()
df.show(5)

# STEP 6: Data Cleaning
df = df.dropna()

# STEP 7: Add Pickup Hour Column
df = df.withColumn("pickup_hour", hour(df["tpep_pickup_datetime"]))

# STEP 8: ANALYSIS

# 1. Average Trip Distance by Payment Type
print("1️⃣ Average Trip Distance by Payment Type:")
df.groupBy("payment_type").agg(avg("trip_distance").alias("avg_distance")).show()

# 2. Busiest Pickup Hours
print("2️⃣ Busiest Pickup Hours:")
df.groupBy("pickup_hour").agg(count("*").alias("total_rides")).orderBy("total_rides", ascending=False).show()

# 3. Top 20 Pickup Locations (using lat/long since PULocationID is not available)
print("3️⃣ Top 20 Pickup Locations:")
df = df.withColumn("pickup_location",round(df["pickup_latitude"], 3).cast("string") + "," + round(df["pickup_longitude"], 3).cast("string"))

df.groupBy("pickup_location").count().orderBy("count", ascending=False).limit(20).show(truncate=False)

# 4. Passenger Count Trend
print("4️⃣ Passenger Count Trend:")
df.groupBy("passenger_count").count().orderBy("count", ascending=False).show()

# 5. Daily Revenue
print("5️⃣ Revenue by Day:")
df = df.withColumn("date", to_date(df["tpep_pickup_datetime"]))
df.groupBy("date").agg(sum("total_amount").alias("daily_revenue")).orderBy("date").show()

# STEP 9: Stop Spark
spark.stop()


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+------------------+---------------