# Big Data Analysis with PySpark – Internship Task 1
## Objective
Analyze a large dataset using PySpark to demonstrate big data handling, aggregation, and derive useful business insights.

## Dataset
Taxi trip data containing millions of records including trip distance, passenger count, pickup/dropoff locations (ids), etc.


In [None]:
!pip install pyspark
from pyspark.sql import SparkSession




In [None]:
#getting top 5 details in the dataset
spark = SparkSession.builder.appName("TaxiTripAnalysis").getOrCreate()
df = spark.read.csv("taxi_trip_data.csv", header=True, inferSchema=True)
df.show(5)


+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|rate_code|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|imp_surcharge|total_amount|pickup_location_id|dropoff_location_id|
+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+
|        2|2018-03-29 13:37:13|2018-03-29 14:17:01|              1|        18.15|        3|                 N|           1|       70.0|  0.0|    0.0|     16.16|        10.5|          0.3|       96.96|               161|                  1|
|        2|2018-03-29 13:37:18|2018-03-2

In [None]:
# Show 10 random rows from the dataset
df.sample(False, 0.001).show(10, truncate=False)


+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+---------------------+
|vendor_id|pickup_datetime    |dropoff_datetime   |passenger_count|trip_distance|rate_code|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|imp_surcharge|total_amount|pickup_location_id|dropoff_location_id|trip_duration_minutes|
+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+---------------------+
|2        |2018-09-29 00:44:07|2018-09-29 01:07:34|2              |11.93        |1        |N                 |1           |33.5       |0.5  |0.5    |8.11      |5.76        |0.3          |48.67       |233           

In [None]:
#he average fare paid per mile (or per unit of distance) across all trips.
df.selectExpr("avg(fare_amount / trip_distance)").show()


+----------------------------------+
|avg((fare_amount / trip_distance))|
+----------------------------------+
|                 8.057220300882245|
+----------------------------------+



In [None]:
df.printSchema()
df.sample(False, 0.001).show(10, truncate=False)


root
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- imp_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)

+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-----

In [None]:
# Total trips
df.count()

# Most common passenger counts
df.groupBy("passenger_count").count().orderBy("count", ascending=False).show()

# Average trip distance
df.selectExpr("avg(trip_distance)").show()


+---------------+------+
|passenger_count| count|
+---------------+------+
|              1|348251|
|              2| 73245|
|              5| 22684|
|              3| 20154|
|              6| 13457|
|              4|  9334|
|              0|  4270|
|              7|    13|
|              8|    12|
|              9|    11|
+---------------+------+

+------------------+
|avg(trip_distance)|
+------------------+
| 8.846192344398553|
+------------------+



In [None]:
df.groupBy("pickup_location_id").count().orderBy("count", ascending=False).show(10)


+------------------+------+
|pickup_location_id| count|
+------------------+------+
|               138|120569|
|               132| 34171|
|               230| 16309|
|               161| 14472|
|               162| 12953|
|               186| 11226|
|                48| 10881|
|               163|  9982|
|               170|  9846|
|               231|  9799|
+------------------+------+
only showing top 10 rows



In [None]:
df.groupBy("passenger_count").count().orderBy("count", ascending=False).show()


+---------------+------+
|passenger_count| count|
+---------------+------+
|              1|348251|
|              2| 73245|
|              5| 22684|
|              3| 20154|
|              6| 13457|
|              4|  9334|
|              0|  4270|
|              7|    13|
|              8|    12|
|              9|    11|
+---------------+------+



In [None]:
# Print the schema (structure) of the dataset
df.printSchema()


root
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- imp_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)



In [None]:
#average trip distance
df.selectExpr("avg(trip_distance)").show()


+------------------+
|avg(trip_distance)|
+------------------+
| 8.846192344398553|
+------------------+



In [None]:
#passenger counts like how many people are traveling and how many times they are traveling
df.groupBy("passenger_count").count().orderBy("count", ascending=False).show()


+---------------+------+
|passenger_count| count|
+---------------+------+
|              1|348251|
|              2| 73245|
|              5| 22684|
|              3| 20154|
|              6| 13457|
|              4|  9334|
|              0|  4270|
|              7|    13|
|              8|    12|
|              9|    11|
+---------------+------+



In [None]:
from pyspark.sql.functions import unix_timestamp, col

df = df.withColumn("trip_duration_minutes",
    (unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")) / 60)

# Average trip duration
df.selectExpr("avg(trip_duration_minutes)").show()


+--------------------------+
|avg(trip_duration_minutes)|
+--------------------------+
|         36.08743187412456|
+--------------------------+



In [None]:

#average fare amount in the dataset
df.selectExpr("sum(fare_amount)").show()



+--------------------+
|    sum(fare_amount)|
+--------------------+
|1.5610735689999796E7|
+--------------------+



In [None]:
#no of columns and rows
df.count()
len(df.columns)


17

In [None]:
# find the top 10 most frequently used drop-off locations in the dataset.
df.groupBy("dropoff_location_id").count().orderBy("count", ascending=False).show(10)


+-------------------+-----+
|dropoff_location_id|count|
+-------------------+-----+
|                138|52674|
|                230|12058|
|                161|10676|
|                162|10206|
|                181| 9171|
|                  1| 9118|
|                231| 8888|
|                265| 8784|
|                 13| 8749|
|                236| 8049|
+-------------------+-----+
only showing top 10 rows



In [None]:

df.groupBy("pickup_location_id").count().orderBy("count", ascending=False).show(10)


+------------------+------+
|pickup_location_id| count|
+------------------+------+
|               138|120569|
|               132| 34171|
|               230| 16309|
|               161| 14472|
|               162| 12953|
|               186| 11226|
|                48| 10881|
|               163|  9982|
|               170|  9846|
|               231|  9799|
+------------------+------+
only showing top 10 rows



In [None]:
df.groupBy("store_and_fwd_flag").count().show()


+------------------+------+
|store_and_fwd_flag| count|
+------------------+------+
|                 Y|  2503|
|                 N|488928|
+------------------+------+



In [None]:
df.orderBy("total_amount", ascending=False).select("pickup_datetime", "dropoff_datetime", "total_amount").show(1)


+-------------------+-------------------+------------+
|    pickup_datetime|   dropoff_datetime|total_amount|
+-------------------+-------------------+------------+
|2018-12-27 10:39:47|2018-12-27 10:39:47|    19269.65|
+-------------------+-------------------+------------+
only showing top 1 row



In [None]:
df.filter(df["fare_amount"] == 0).count()


526

In [None]:
df.selectExpr("avg(extra)", "avg(tolls_amount)", "avg(imp_surcharge)").show()


+-------------------+-----------------+------------------+
|         avg(extra)|avg(tolls_amount)|avg(imp_surcharge)|
+-------------------+-----------------+------------------+
|0.31620854199267034|2.195315374897011|0.2982681393739385|
+-------------------+-----------------+------------------+



In [None]:
#Average Extra Charges(tolls ,Surcharge,etc)
df.selectExpr("avg(total_amount / passenger_count)").show()


+-------------------------------------+
|avg((total_amount / passenger_count))|
+-------------------------------------+
|                   33.833158040230494|
+-------------------------------------+



In [None]:

#Logest Trips by distance
df.orderBy("trip_distance", ascending=False).select("trip_distance", "pickup_datetime", "dropoff_datetime").show(5)


+-------------+-------------------+-------------------+
|trip_distance|    pickup_datetime|   dropoff_datetime|
+-------------+-------------------+-------------------+
|      7655.76|2018-07-19 18:00:25|2018-07-23 10:54:16|
|        301.9|2018-03-10 15:10:29|2018-03-10 21:28:55|
|       245.06|2018-08-11 17:14:29|2018-08-11 21:37:54|
|        211.6|2018-03-02 22:43:03|2018-03-02 22:43:36|
|       204.54|2018-03-22 06:43:13|2018-03-22 09:59:34|
+-------------+-------------------+-------------------+
only showing top 5 rows



In [None]:

#What kind of pricing rules.Trip Count by Rate Code
df.groupBy("rate_code").count().orderBy("count", ascending=False).show(1)


+---------+------+
|rate_code| count|
+---------+------+
|        1|466990|
+---------+------+
only showing top 1 row



In [None]:
# Average Tip Percentage
df = df.withColumn("tip_percentage", (df["tip_amount"] / df["fare_amount"]) * 100)
df.selectExpr("avg(tip_percentage)").show()


+-------------------+
|avg(tip_percentage)|
+-------------------+
|  33.64052399221934|
+-------------------+



1- Total trips analyzed: [491431]

2- Most common passenger count: [1 passenger]

3- Average trip distance:              
[8.84619234439855 miles]

4- Average Fare Per Mile: [8.057220300882245]

5- Total Revenue : [1.5610735689999796E7]

6- Average Trip Duration(in minutes):    [  36.08743187412456]

7- Trips with Zero or Missing Fare(free rides or errors): [526]

8- Shows how many trips were temporarily stored in the taxi system before being sent (due to signal loss etc.) : Yes[2503] and No[488928]

9-Average Extra Charges (like tolls, surcharge, etc.):
 avg(extra)=0.31620854199267034| avgtolls_amount=2.195315374897011| avg(imp_surcharge)=0.2982681393739385|



10- Average Total Amount per Passenger:
[ 33.833158040230494]

11-Logest Trips by Distance:[ trip distance=7655.76|pickup datetime=   2018-07-19 18:00:25|drop off date time=2018-07-23 10:54:16]

12-Trip Count by Rate Code:[1-466990]

13- Average Tip Percentage:[33.64052399221934]