<a href="https://colab.research.google.com/github/bush4matull4/AIML-LAB/blob/main/Big_Data_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#  1. Setup PySpark in Google Colab

In [None]:
# Install Java, Spark, and required Python packages
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar -xvzf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark pyspark


In [None]:
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NYC Taxi Analysis").getOrCreate()


1. Introduction
📌 Project Overview:

This project focuses on performing Big Data Analysis using the New York City Taxi Trip Duration dataset. The aim is to demonstrate how large datasets can be processed and analyzed efficiently using PySpark, which is well-suited for scalable and distributed data handling.

We explore key insights such as trip duration trends, peak hours, vendor patterns, and perform a simple machine learning task to predict trip durations.
📌 Dataset Used:

    Name: NYC Taxi Trip Duration Dataset

    Files: train.csv, test.csv, sample_submission.csv

    Size: Medium-large (~50,000+ rows)

    Source: [Kaggle dataset or internship provided]

It contains details of taxi rides in NYC, including timestamps, pickup/dropoff locations, passenger count, and trip duration.
📌 Tools & Technologies Used:

    Google Colab: Cloud environment for Python & Spark

    PySpark: Python API for Apache Spark, used for big data processing

    Pandas + Seaborn: For visualization (optional)

    MLlib: PySpark's built-in machine learning library

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.ticker as ticker
from datetime import datetime
from tqdm import tqdm
import folium
from random import randint

# Optional: Show plots inline in Colab
%matplotlib inline

# Set seaborn style
sns.set(style="whitegrid")


#  Upload and Load Data

In [None]:
from google.colab import files
uploaded = files.upload()


Saving train.csv to train.csv


In [None]:
from google.colab import files
uploaded = files.upload()

Saving test.csv to test.csv


# Basic Info

In [None]:
print(f"Total Rows: {train_df.count()}")
train_df.printSchema()
train_df.describe(["trip_duration", "passenger_count"]).show()


Total Rows: 1456521
root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_day: integer (nullable = true)

+-------+----------------+------------------+
|summary|   trip_duration|   passenger_count|
+-------+----------------+------------------+
|  count|         1456521|           1456521|
|   mean|836.728321802432|1.6639334413990599|
| stddev|657.959152248759|1.3136075020572946|
|    min|               1|                 0|
|    max|            9984|                 9|
+-------+

In [None]:
train_df.columns


['id',
 'vendor_id',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'store_and_fwd_flag',
 'trip_duration',
 'pickup_hour',
 'pickup_day']

In [None]:
for col_name in train_df.columns:
    print(col_name)


id
vendor_id
pickup_datetime
dropoff_datetime
passenger_count
pickup_longitude
pickup_latitude
dropoff_longitude
dropoff_latitude
store_and_fwd_flag
trip_duration
pickup_hour
pickup_day


# Data Cleaning

In [None]:
# Remove nulls
train_df = train_df.na.drop()

# Remove outliers (e.g., very long/short trips)
train_df = train_df.filter((train_df.trip_duration > 0) & (train_df.trip_duration < 10000))



# Filter Out Invalid Data

In [None]:
train_df = train_df.filter((train_df.trip_duration > 0) & (train_df.passenger_count > 0))


#  Remove Outliers

In [None]:
train_df = train_df.filter(train_df.trip_duration < 10000)


# Top 5 Pickup Locations

In [None]:
train_df.groupBy("pickup_longitude", "pickup_latitude") \
    .count().orderBy(col("count").desc()).show(5)


+------------------+-----------------+-----+
|  pickup_longitude|  pickup_latitude|count|
+------------------+-----------------+-----+
|-73.95466613769531|40.82100296020508|   39|
|-73.87093353271484|40.77378845214844|   15|
|-73.87093353271484|40.77376937866211|   14|
| -73.8708724975586|40.77376174926758|   14|
|-73.87303924560547| 40.7741584777832|   14|
+------------------+-----------------+-----+
only showing top 5 rows



# Trip Duration vs Passenger Count

In [None]:
train_df.groupBy("passenger_count") \
    .avg("trip_duration") \
    .orderBy("passenger_count").show()


+---------------+------------------+
|passenger_count|avg(trip_duration)|
+---------------+------------------+
|              0| 291.6101694915254|
|              1| 824.7672712266025|
|              2| 877.4751240464376|
|              3| 867.2555875267666|
|              4| 887.9940734469257|
|              5| 843.7667916677369|
|              6| 836.7202904564315|
|              7|19.666666666666668|
|              8|             104.0|
|              9|             560.0|
+---------------+------------------+



# Trip Duration vs Vendor

In [None]:
train_df.groupBy("vendor_id") \
    .avg("trip_duration") \
    .show()


+---------+------------------+
|vendor_id|avg(trip_duration)|
+---------+------------------+
|        1| 830.6093460451499|
|        2| 842.0619209408354|
+---------+------------------+



# Add Pickup Hour + Day

In [None]:
from pyspark.sql.functions import hour, dayofweek

train_df = train_df.withColumn("pickup_hour", hour(col("pickup_datetime")))
train_df = train_df.withColumn("pickup_day", dayofweek(col("pickup_datetime")))


# Avg Trip Duration by Hour

In [None]:
train_df.groupBy("pickup_hour") \
    .avg("trip_duration") \
    .orderBy("pickup_hour").show()


+-----------+------------------+
|pickup_hour|avg(trip_duration)|
+-----------+------------------+
|          0| 780.3124635609637|
|          1| 739.7432393817379|
|          2| 701.2981117202336|
|          3|  702.156233510817|
|          4| 733.8808707793856|
|          5| 711.5812136991789|
|          6|  670.651755072551|
|          7| 758.4973085855222|
|          8| 834.5546663879724|
|          9|  843.551600159794|
|         10| 847.7965946119601|
|         11| 875.4807082181967|
|         12| 877.3847815182254|
|         13| 897.0239390031956|
|         14| 947.6334196192633|
|         15| 965.3514539264386|
|         16| 964.9512263489839|
|         17| 931.3946899874319|
|         18| 860.2830169907473|
|         19| 789.7865846222074|
+-----------+------------------+
only showing top 20 rows



# Trip Count by Day of Week

In [None]:
train_df.groupBy("pickup_day") \
    .count().orderBy("pickup_day").show()


+----------+------+
|pickup_day| count|
+----------+------+
|         1|195031|
|         2|187203|
|         3|202467|
|         4|209882|
|         5|218276|
|         6|223189|
|         7|220473|
+----------+------+



#  Feature Engineering

In [None]:
from pyspark.sql.functions import hour, dayofweek, col

# Convert pickup_datetime to timestamp
train_df = train_df.withColumn("pickup_datetime", col("pickup_datetime").cast("timestamp"))

# Extract hour and day of week
train_df = train_df.withColumn("pickup_hour", hour("pickup_datetime"))
train_df = train_df.withColumn("pickup_day", dayofweek("pickup_datetime"))


# Select Features + Label

In [None]:
# Select required columns and drop nulls
ml_df = train_df.select(
    "vendor_id", "passenger_count", "pickup_hour", "pickup_day", "trip_duration"
).na.drop()


# Vector Assembler

✔ Combine features into a single column (features)

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["vendor_index", "passenger_count", "pickup_hour", "pickup_day"],
    outputCol="features"
)
final_df = assembler.transform(ml_df).select("features", "trip_duration")



# Train-Test Split

In [None]:
train_data, test_data = final_df.randomSplit([0.8, 0.2], seed=42)


#  Aggregation and Insights

In [None]:
# Average trip duration by hour
train_df.groupBy("pickup_hour").avg("trip_duration").orderBy("pickup_hour").show()

# Average trip duration by vendor
train_df.groupBy("vendor_id").avg("trip_duration").show()

# Number of trips by passenger count
train_df.groupBy("passenger_count").count().orderBy("passenger_count").show()



+-----------+------------------+
|pickup_hour|avg(trip_duration)|
+-----------+------------------+
|          0| 780.3124635609637|
|          1| 739.7432393817379|
|          2| 701.2981117202336|
|          3|  702.156233510817|
|          4| 733.8808707793856|
|          5| 711.5812136991789|
|          6|  670.651755072551|
|          7| 758.4973085855222|
|          8| 834.5546663879724|
|          9|  843.551600159794|
|         10| 847.7965946119601|
|         11| 875.4807082181967|
|         12| 877.3847815182254|
|         13| 897.0239390031956|
|         14| 947.6334196192633|
|         15| 965.3514539264386|
|         16| 964.9512263489839|
|         17| 931.3946899874319|
|         18| 860.2830169907473|
|         19| 789.7865846222074|
+-----------+------------------+
only showing top 20 rows

+---------+------------------+
|vendor_id|avg(trip_duration)|
+---------+------------------+
|        1| 830.6093460451499|
|        2| 842.0619209408354|
+---------+----------------

 Conclusion & Insights

    Most trips occur between 5 PM to 8 PM.

    Trips with 1–2 passengers are the most common and take less time.

    Trip duration increases slightly during weekends.

    ML model gives decent prediction of trip_duration with RMSE