In [0]:
from pyspark.sql.functions import mean, median, mode, hour, col, count, dayofweek, corr, when
from pyspark.sql.types import DoubleType

In [0]:
df = spark.read.csv("dbfs:/FileStore/tables/taxi_tripdata.csv", header= True, sep=",")

df.toPandas()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,1,2021-07-01 00:30:52,2021-07-01 00:35:36,N,1,74,168,1,1.20,6,0.5,0.5,0,0,,0.3,7.3,2,1,0
1,2,2021-07-01 00:25:36,2021-07-01 01:01:31,N,1,116,265,2,13.69,42,0.5,0.5,0,0,,0.3,43.3,2,1,0
2,2,2021-07-01 00:05:58,2021-07-01 00:12:00,N,1,97,33,1,.95,6.5,0.5,0.5,2.34,0,,0.3,10.14,1,1,0
3,2,2021-07-01 00:41:40,2021-07-01 00:47:23,N,1,74,42,1,1.24,6.5,0.5,0.5,0,0,,0.3,7.8,2,1,0
4,2,2021-07-01 00:51:32,2021-07-01 00:58:46,N,1,42,244,1,1.10,7,0.5,0.5,0,0,,0.3,8.3,2,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
83686,,2021-07-02 07:59:00,2021-07-02 08:33:00,,,218,169,,18.04,50.24,2.75,0,0,6.55,,0.3,59.84,,,
83687,,2021-07-02 07:02:00,2021-07-02 07:18:00,,,74,137,,5.56,19.16,0,0,3.66,0,,0.3,25.87,,,
83688,,2021-07-02 07:53:00,2021-07-02 08:15:00,,,69,75,,5.13,22.45,0,0,0,0,,0.3,22.75,,,
83689,,2021-07-02 07:58:00,2021-07-02 08:30:00,,,117,82,,12.58,48.62,2.75,0,0,2.45,,0.3,54.12,,,


In [0]:
#Preprocess the data
# Count and print the number of null values for each column

for column in df.columns:
    null_count = df.filter(col(column).isNull()).count()
    print(f"{column}: {null_count}")

VendorID: 32518
lpep_pickup_datetime: 0
lpep_dropoff_datetime: 0
store_and_fwd_flag: 32518
RatecodeID: 32518
PULocationID: 0
DOLocationID: 0
passenger_count: 32518
trip_distance: 0
fare_amount: 0
extra: 0
mta_tax: 0
tip_amount: 0
tolls_amount: 0
ehail_fee: 83691
improvement_surcharge: 0
total_amount: 0
payment_type: 32518
trip_type: 32518
congestion_surcharge: 32518


In [0]:
#Drop columns with 90% null values

n = df.count()

columns_to_drop = []

for column in df.columns:
    null_count = df.filter(col(column).isNull()).count()
    null_percentage = (null_count / n) * 100
    if null_percentage > 90:  
        columns_to_drop.append(column)

df = df.drop(*columns_to_drop)

print("Remaining columns:", df.columns)



Remaining columns: ['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge']


In [0]:
# Loop through each column to fill nulls with mode

for i in df.columns:
    x = df.select(mode(i).alias("nn")).head()[0]
    if df.filter(col(i).isNull()).count()>0:
        df = df.fillna({i:x})

df.show(5)


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+---------+--------------------+
|       1| 2021-07-01 00:30:52|  2021-07-01 00:35:36|                 N|         1|          74|         168|              1|         1.20|          6|  0.5|    0.5|         0|           0|    

In [0]:
# Loop through each column to count null values
for i in df.columns:
    null_count = df.filter(col(i).isNull()).count()  # Count null values in each column
    print(f"{i}: {null_count}")

VendorID: 0
lpep_pickup_datetime: 0
lpep_dropoff_datetime: 0
store_and_fwd_flag: 0
RatecodeID: 0
PULocationID: 0
DOLocationID: 0
passenger_count: 0
trip_distance: 0
fare_amount: 0
extra: 0
mta_tax: 0
tip_amount: 0
tolls_amount: 0
improvement_surcharge: 0
total_amount: 0
payment_type: 0
trip_type: 0
congestion_surcharge: 0


Data Analysis

In [0]:
#1. How many trips Recorded

total_trips = df.count()
print(f"Total number of trips recorded: {total_trips}")

Total number of trips recorded: 83691


In [0]:
#2. What is the Average Trip Distance

df.select(mean("trip_distance").alias("Average_Trip_Distance")).show()

+---------------------+
|Average_Trip_Distance|
+---------------------+
|   194.35469931055877|
+---------------------+



In [0]:
#3. What are the top 5 PULocationID where trips start from?

df.groupBy("PULocationID") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(5) \
    .show()

+------------+-----+
|PULocationID|count|
+------------+-----+
|          74| 8770|
|          75| 7713|
|          41| 4761|
|          42| 3229|
|          95| 2486|
+------------+-----+



In [0]:
#4. What are the top 5 DOLocationID where trips end?

df.groupBy("DOLocationID") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(5) \
    .show()

+------------+-----+
|DOLocationID|count|
+------------+-----+
|          74| 3666|
|          75| 3122|
|          42| 2904|
|          41| 2527|
|         236| 1700|
+------------+-----+



In [0]:
#5. Peak Hours for Someone to start ride

df.groupBy(hour("lpep_pickup_datetime").alias("Pickup_Hours")) \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(5) \
    .show()

+------------+-----+
|Pickup_Hours|count|
+------------+-----+
|          10| 6096|
|          11| 6092|
|           9| 5798|
|          12| 5766|
|          15| 5744|
+------------+-----+



In [0]:
#6. What is the distribution of trips based on the payment type?

df.groupBy("payment_type") \
    .count() \
    .orderBy("payment_type") \
    .show()

+------------+-----+
|payment_type|count|
+------------+-----+
|           1|62508|
|           2|20831|
|           3|  307|
|           4|   44|
|           5|    1|
+------------+-----+



In [0]:
#7. What is the most common RatecodeID in the dataset?

df.groupBy("RatecodeID") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(5) \
    .show()

+----------+-----+
|RatecodeID|count|
+----------+-----+
|         1|81512|
|         5| 1954|
|         2|  158|
|         4|   41|
|         3|   26|
+----------+-----+



In [0]:
#8. What is the Average fare for different vendors?

df = df.withColumn("total_amount", col("total_amount").cast(DoubleType()))
df.groupBy("VendorID") \
    .mean("total_amount") \
    .withColumnRenamed("avg(total_amount)", "Average_Fare") \
    .show()

+--------+------------------+
|VendorID|      Average_Fare|
+--------+------------------+
|       1|17.013627772674567|
|       2|24.925072694291224|
+--------+------------------+



In [0]:
#9. How many trips have a tolls amount greater tahn zero?

filtered_df = df.filter(col("tolls_amount") > 0)

result = filtered_df.agg(
    mean("tolls_amount").alias("Average_tolls_amount"),
    count("tolls_amount").alias("Tolls_amount_greater_than_zero")
)

result.show()

+--------------------+------------------------------+
|Average_tolls_amount|Tolls_amount_greater_than_zero|
+--------------------+------------------------------+
|   6.413921953613792|                          8149|
+--------------------+------------------------------+



In [0]:
#10. What is the distribution of trip types?

df = df.withColumn(
    "TRIPS", 
    when(df["trip_type"] == 1, "City Trip")
    .when(df["trip_type"] == 2, "InterCity Trip")
    .otherwise("Other")
)

df.groupBy("TRIPS").count().orderBy(col("count").desc()).show()


+--------------+-----+
|         TRIPS|count|
+--------------+-----+
|     City Trip|81931|
|InterCity Trip| 1760|
+--------------+-----+



In [0]:
#11. What is the most common day of the week for trips?

df = df.withColumn("DATE", col("lpep_pickup_datetime").cast("date")) \
       .withColumn("Dayw", dayofweek(col("lpep_pickup_datetime")))

df = df.withColumn("Dw", when(df["Dayw"] == 1, "Mon")
                        .when(df["Dayw"] == 2, "Tue")
                        .when(df["Dayw"] == 3, "Wed")
                        .when(df["Dayw"] == 4, "Thu")
                        .when(df["Dayw"] == 5, "Fri")
                        .when(df["Dayw"] == 6, "Sat")
                        .when(df["Dayw"] == 7, "Sun"))

df.groupBy(col("Dw").alias("Day of Week")).count().orderBy(col("count").desc()).show()

df = df.drop("DATE", "Dw", "Dayw")

+-----------+-----+
|Day of Week|count|
+-----------+-----+
|        Sat|14964|
|        Fri|14929|
|        Sun|12862|
|        Thu|11752|
|        Wed|10875|
|        Tue|10403|
|        Mon| 7906|
+-----------+-----+



In [0]:
#12. What is the average tip_amount?

df.select(mean("tip_amount").alias("Average_Tip_Amount")).show()

+------------------+
|Average_Tip_Amount|
+------------------+
| 1.058618130981837|
+------------------+



In [0]:
#13. Is there any correlation between trip distance and total amount paid?

correlation = df.select(corr("trip_distance", "total_amount")).first()[0]

print(f"Correlation between trip distance and total amount paid: {correlation}")


Correlation between trip distance and total amount paid: 0.0250718211669305
