In [None]:
import pandas as pd

df_trip = pd.read_csv('/content/taxi_trip_data.csv')
df_trip.head()

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,rate_code,store_and_fwd_flag,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,imp_surcharge,pickup_location_id,dropoff_location_id
0,1,5/11/2018 17:40,5/11/2018 17:55,1,1.6,1,N,1,11.5,1.0,0.5,0.0,0.0,0.3,48,68
1,2,3/22/2018 23:01,3/22/2018 23:25,1,9.52,1,N,1,28.5,0.5,0.5,5.96,0.0,0.3,138,230
2,2,7/24/2018 9:58,7/24/2018 10:22,1,2.17,1,N,1,15.5,0.0,0.5,1.5,0.0,0.3,234,48
3,2,12/21/2018 18:28,12/21/2018 18:35,1,0.86,1,N,2,6.0,1.0,0.5,0.0,0.0,0.3,79,125
4,1,8/15/2018 13:58,8/15/2018 14:05,1,0.3,1,N,2,5.5,0.0,0.5,0.0,0.0,0.3,233,233


In [None]:
df_trip['payment_type'].value_counts()

Unnamed: 0_level_0,count
payment_type,Unnamed: 1_level_1
1,727397
2,314232
3,5397
4,1549


In [None]:
df_trip.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1048575 entries, 0 to 1048574
Data columns (total 16 columns):
 #   Column               Non-Null Count    Dtype  
---  ------               --------------    -----  
 0   vendor_id            1048575 non-null  int64  
 1   pickup_datetime      1048575 non-null  object 
 2   dropoff_datetime     1048575 non-null  object 
 3   passenger_count      1048575 non-null  int64  
 4   trip_distance        1048575 non-null  float64
 5   rate_code            1048575 non-null  int64  
 6   store_and_fwd_flag   1048575 non-null  object 
 7   payment_type         1048575 non-null  int64  
 8   fare_amount          1048575 non-null  float64
 9   extra                1048575 non-null  float64
 10  mta_tax              1048575 non-null  float64
 11  tip_amount           1048575 non-null  float64
 12  tolls_amount         1048575 non-null  float64
 13  imp_surcharge        1048575 non-null  float64
 14  pickup_location_id   1048575 non-null  int64  
 15

In [None]:
df_zone = pd.read_csv('/content/taxi_zone_geo.csv')
df_zone.head()

Unnamed: 0,zone_id,zone_name,borough,zone_geom
0,1,Newark Airport,EWR,"POLYGON((-74.1856319999999 40.6916479999999, -..."
1,3,Allerton/Pelham Gardens,Bronx,"POLYGON((-73.848596761 40.8716707849999, -73.8..."
2,18,Bedford Park,Bronx,"POLYGON((-73.8844286139999 40.8668003789999, -..."
3,20,Belmont,Bronx,"POLYGON((-73.8839239579998 40.8644177609999, -..."
4,31,Bronx Park,Bronx,"POLYGON((-73.8710017319999 40.8572767429999, -..."


In [None]:
df_zone.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 263 entries, 0 to 262
Data columns (total 4 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   zone_id    263 non-null    int64 
 1   zone_name  263 non-null    object
 2   borough    263 non-null    object
 3   zone_geom  263 non-null    object
dtypes: int64(1), object(3)
memory usage: 8.3+ KB


# **Preprocessing taxi_trip_data.csv**

---



First, we need to convert the attributes to their suitable data type

In [None]:
#converts to datetime
df_trip['pickup_datetime'] = pd.to_datetime(df_trip['pickup_datetime'], errors='coerce')
df_trip['dropoff_datetime'] = pd.to_datetime(df_trip['dropoff_datetime'], errors='coerce')

In [None]:
#converts to numerical values
df_trip['passenger_count'] = pd.to_numeric(df_trip['passenger_count'], errors='coerce')
df_trip['trip_distance'] = pd.to_numeric(df_trip['trip_distance'], errors='coerce')

In [None]:
df_trip.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1048575 entries, 0 to 1048574
Data columns (total 16 columns):
 #   Column               Non-Null Count    Dtype         
---  ------               --------------    -----         
 0   vendor_id            1048575 non-null  int64         
 1   pickup_datetime      1048575 non-null  datetime64[ns]
 2   dropoff_datetime     1048575 non-null  datetime64[ns]
 3   passenger_count      1048575 non-null  int64         
 4   trip_distance        1048575 non-null  float64       
 5   rate_code            1048575 non-null  int64         
 6   store_and_fwd_flag   1048575 non-null  object        
 7   payment_type         1048575 non-null  int64         
 8   fare_amount          1048575 non-null  float64       
 9   extra                1048575 non-null  float64       
 10  mta_tax              1048575 non-null  float64       
 11  tip_amount           1048575 non-null  float64       
 12  tolls_amount         1048575 non-null  float64       
 1

Now let's check on the NA values and other values like empty strings or zeroes. We should handle them

In [None]:
df_trip.isna().sum() #check na values

Unnamed: 0,0
vendor_id,0
pickup_datetime,0
dropoff_datetime,0
passenger_count,0
trip_distance,0
rate_code,0
store_and_fwd_flag,0
payment_type,0
fare_amount,0
extra,0


In [None]:
(df_trip == '').sum() #check empty strings

Unnamed: 0,0
vendor_id,0
pickup_datetime,0
dropoff_datetime,0
passenger_count,0
trip_distance,0
rate_code,0
store_and_fwd_flag,0
payment_type,0
fare_amount,0
extra,0


In [None]:
#check for zeroes in columns that should't contain a zero
df_trip[['trip_distance', 'fare_amount', 'passenger_count']].isin([0]).sum()

Unnamed: 0,0
trip_distance,7143
fare_amount,280
passenger_count,9408


Looks like there is no na values, which is good.

However, there are rows with zero values in trip_distance, fare_amount, and passenger_count columns, which should be removed, as these represent illogical or invalid records for real-world taxi trips.





In [None]:
df_trip = df_trip[(df_trip['trip_distance'] > 0) & (df_trip['fare_amount'] > 0) & (df_trip['passenger_count'] > 0)]

print('df_trip shape after dropping zero and na values:', df_trip.shape)


df_trip shape after dropping zero and na values: (1031509, 16)


In [None]:
df_trip[['trip_distance', 'fare_amount', 'passenger_count']].isin([0]).sum()

Unnamed: 0,0
trip_distance,0
fare_amount,0
passenger_count,0


remove duplicates

In [None]:
df_trip.duplicated().sum()

np.int64(820)

In [None]:
df_trip.drop_duplicates(inplace=True)
print('df_trip shape after dropping duplicates:', df_trip.shape)

df_trip shape after dropping duplicates: (1030689, 16)


Let's drop vendor_id since it won't be meaningful in our analysis

In [None]:
# Check if 'vendor_id' column exists before dropping
if 'vendor_id' in df_trip.columns:
    df_trip.drop(columns=['vendor_id'], inplace=True)
    print('df_trip shape after dropping duplicates:', df_trip.shape)
else:
    print("Column 'vendor_id' not found in DataFrame.")

df_trip shape after dropping duplicates: (1030689, 15)


In [None]:
df_trip.head()

Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,rate_code,store_and_fwd_flag,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,imp_surcharge,pickup_location_id,dropoff_location_id
0,2018-05-11 17:40:00,2018-05-11 17:55:00,1,1.6,1,N,1,11.5,1.0,0.5,0.0,0.0,0.3,48,68
1,2018-03-22 23:01:00,2018-03-22 23:25:00,1,9.52,1,N,1,28.5,0.5,0.5,5.96,0.0,0.3,138,230
2,2018-07-24 09:58:00,2018-07-24 10:22:00,1,2.17,1,N,1,15.5,0.0,0.5,1.5,0.0,0.3,234,48
3,2018-12-21 18:28:00,2018-12-21 18:35:00,1,0.86,1,N,2,6.0,1.0,0.5,0.0,0.0,0.3,79,125
4,2018-08-15 13:58:00,2018-08-15 14:05:00,1,0.3,1,N,2,5.5,0.0,0.5,0.0,0.0,0.3,233,233


Let's check on the values in rate_code, store_and_fwd_flag and payment_type

In [None]:
df_trip['rate_code'].value_counts()

Unnamed: 0_level_0,count
rate_code,Unnamed: 1_level_1
1,1002856
2,23078
3,2093
5,2058
4,598
99,4
6,2


there is an invalid value in rate_code, which is 99. rate_code should have values from 1 to 6 only


In [None]:
df_trip = df_trip[df_trip['rate_code'] != 99] # keep the rows that have rate_code that is not 99
df_trip['rate_code'].value_counts()

Unnamed: 0_level_0,count
rate_code,Unnamed: 1_level_1
1,1002856
2,23078
3,2093
5,2058
4,598
6,2


In [None]:
df_trip['store_and_fwd_flag'].value_counts()

Unnamed: 0_level_0,count
store_and_fwd_flag,Unnamed: 1_level_1
N,1026050
Y,4635


store_and_fwd_flag looks good

In [None]:
df_trip['payment_type'].value_counts()

Unnamed: 0_level_0,count
payment_type,Unnamed: 1_level_1
1,717400
2,308178
3,3972
4,1135


looks good as well.

In [None]:
df_trip.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1030685 entries, 0 to 1048574
Data columns (total 15 columns):
 #   Column               Non-Null Count    Dtype         
---  ------               --------------    -----         
 0   pickup_datetime      1030685 non-null  datetime64[ns]
 1   dropoff_datetime     1030685 non-null  datetime64[ns]
 2   passenger_count      1030685 non-null  int64         
 3   trip_distance        1030685 non-null  float64       
 4   rate_code            1030685 non-null  int64         
 5   store_and_fwd_flag   1030685 non-null  object        
 6   payment_type         1030685 non-null  int64         
 7   fare_amount          1030685 non-null  float64       
 8   extra                1030685 non-null  float64       
 9   mta_tax              1030685 non-null  float64       
 10  tip_amount           1030685 non-null  float64       
 11  tolls_amount         1030685 non-null  float64       
 12  imp_surcharge        1030685 non-null  float64       
 13  pi

create trip_duration_min column that has the duration of the trip in minutes

In [None]:
df_trip['trip_duration_min'] = (df_trip['dropoff_datetime'] - df_trip['pickup_datetime']).dt.total_seconds() / 60

create  total_trip_cost column

In [None]:
df_trip['total_trip_cost'] = df_trip[['fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount']].sum(axis=1) #axis=1 makes it so it sums the rows, not the columns (axis=0 is the columns, and is the default as well)

In [None]:
df_trip.head()

Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,rate_code,store_and_fwd_flag,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,imp_surcharge,pickup_location_id,dropoff_location_id,trip_duration_min,total_trip_cost
0,2018-05-11 17:40:00,2018-05-11 17:55:00,1,1.6,1,N,1,11.5,1.0,0.5,0.0,0.0,0.3,48,68,15.0,13.0
1,2018-03-22 23:01:00,2018-03-22 23:25:00,1,9.52,1,N,1,28.5,0.5,0.5,5.96,0.0,0.3,138,230,24.0,35.46
2,2018-07-24 09:58:00,2018-07-24 10:22:00,1,2.17,1,N,1,15.5,0.0,0.5,1.5,0.0,0.3,234,48,24.0,17.5
3,2018-12-21 18:28:00,2018-12-21 18:35:00,1,0.86,1,N,2,6.0,1.0,0.5,0.0,0.0,0.3,79,125,7.0,7.5
4,2018-08-15 13:58:00,2018-08-15 14:05:00,1,0.3,1,N,2,5.5,0.0,0.5,0.0,0.0,0.3,233,233,7.0,6.0


# **Preprocessing taxi_zone_geo.csv and merging**

---



In [None]:
df_zone.head()

Unnamed: 0,zone_id,zone_name,borough,zone_geom
0,1,Newark Airport,EWR,"POLYGON((-74.1856319999999 40.6916479999999, -..."
1,3,Allerton/Pelham Gardens,Bronx,"POLYGON((-73.848596761 40.8716707849999, -73.8..."
2,18,Bedford Park,Bronx,"POLYGON((-73.8844286139999 40.8668003789999, -..."
3,20,Belmont,Bronx,"POLYGON((-73.8839239579998 40.8644177609999, -..."
4,31,Bronx Park,Bronx,"POLYGON((-73.8710017319999 40.8572767429999, -..."


drop zone_geom since it won't be used in the analysis

In [None]:
df_zone = df_zone.drop(columns=['zone_geom'])

In [None]:
df_zone.isna().sum()

Unnamed: 0,0
zone_id,0
zone_name,0
borough,0


**Start merging**

change zone_id to match the other dataset

In [None]:
df_zone_pickup = df_zone.rename(columns={
    'zone_id': 'pickup_location_id',
    'zone_name': 'pickup_zone',
    'borough': 'pickup_borough'
})

In [None]:
df_trip = df_trip.merge(df_zone_pickup, on='pickup_location_id', how='left') #merge based on the column pickup_location_id

change zone_id so that it matches with the other dataset, but this time for dopoff instead of pickup

In [None]:
df_zone_dropoff = df_zone.rename(columns={
    'zone_id': 'dropoff_location_id',
    'zone_name': 'dropoff_zone',
    'borough': 'dropoff_borough'
})

In [None]:
df_trip = df_trip.merge(df_zone_dropoff, on='dropoff_location_id', how='left') #merge based on the column dropoff_location_id

In [None]:
df = df_trip
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1030977 entries, 0 to 1030976
Data columns (total 21 columns):
 #   Column               Non-Null Count    Dtype         
---  ------               --------------    -----         
 0   pickup_datetime      1030977 non-null  datetime64[ns]
 1   dropoff_datetime     1030977 non-null  datetime64[ns]
 2   passenger_count      1030977 non-null  int64         
 3   trip_distance        1030977 non-null  float64       
 4   rate_code            1030977 non-null  int64         
 5   store_and_fwd_flag   1030977 non-null  object        
 6   payment_type         1030977 non-null  int64         
 7   fare_amount          1030977 non-null  float64       
 8   extra                1030977 non-null  float64       
 9   mta_tax              1030977 non-null  float64       
 10  tip_amount           1030977 non-null  float64       
 11  tolls_amount         1030977 non-null  float64       
 12  imp_surcharge        1030977 non-null  float64       
 1

In [None]:
df.isna().sum()

Unnamed: 0,0
pickup_datetime,0
dropoff_datetime,0
passenger_count,0
trip_distance,0
rate_code,0
store_and_fwd_flag,0
payment_type,0
fare_amount,0
extra,0
mta_tax,0


there seems to be some NA values that occured after merging, but their amount is very small in relation to the size of the dataset, so there should be no harm in dropping them

In [None]:
df.dropna(inplace=True)

In [None]:
df.shape

(1013266, 21)

In [None]:
df.head()

Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,rate_code,store_and_fwd_flag,payment_type,fare_amount,extra,mta_tax,...,tolls_amount,imp_surcharge,pickup_location_id,dropoff_location_id,trip_duration_min,total_trip_cost,pickup_zone,pickup_borough,dropoff_zone,dropoff_borough
0,2018-05-11 17:40:00,2018-05-11 17:55:00,1,1.6,1,N,1,11.5,1.0,0.5,...,0.0,0.3,48,68,15.0,13.0,Clinton East,Manhattan,East Chelsea,Manhattan
1,2018-03-22 23:01:00,2018-03-22 23:25:00,1,9.52,1,N,1,28.5,0.5,0.5,...,0.0,0.3,138,230,24.0,35.46,LaGuardia Airport,Queens,Times Sq/Theatre District,Manhattan
2,2018-07-24 09:58:00,2018-07-24 10:22:00,1,2.17,1,N,1,15.5,0.0,0.5,...,0.0,0.3,234,48,24.0,17.5,Union Sq,Manhattan,Clinton East,Manhattan
3,2018-12-21 18:28:00,2018-12-21 18:35:00,1,0.86,1,N,2,6.0,1.0,0.5,...,0.0,0.3,79,125,7.0,7.5,East Village,Manhattan,Hudson Sq,Manhattan
4,2018-08-15 13:58:00,2018-08-15 14:05:00,1,0.3,1,N,2,5.5,0.0,0.5,...,0.0,0.3,233,233,7.0,6.0,UN/Turtle Bay South,Manhattan,UN/Turtle Bay South,Manhattan


In [None]:
df.to_csv('processed_taxi_data.csv', index=False)

# **Analytical Queries**

---



In [None]:
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import findspark
findspark.init()

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
# Creating a spark context class
sc = SparkContext()
# create a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


In [None]:
sdf = spark.createDataFrame(df)

In [None]:
from pyspark.sql.functions import col, hour, udf
from pyspark.sql.types import StringType
from tabulate import tabulate
import pandas as pd # Import pandas as it is used in later code

# Extract hour from pickup_datetime using PySpark function
sdf = sdf.withColumn('hour', hour(col('pickup_datetime')))

# Define the time of day mapping UDF
def map_time_of_day(hour):
    if hour is None: # Handle None values that might occur if hour extraction fails
        return None
    elif 5 <= hour < 12:
        return 'Morning'
    elif 12 <= hour < 17:
        return 'Afternoon'
    elif 17 <= hour < 22:
        return 'Evening'
    else:
        return 'Night'

# Create a PySpark UDF from the Python function
map_time_of_day_udf = udf(map_time_of_day, StringType())

# Apply the UDF to create the 'time_of_day' column in the Spark DataFrame
sdf = sdf.withColumn('time_of_day', map_time_of_day_udf(col('hour')))

# Convert the necessary Spark DataFrame columns back to a pandas DataFrame
df_for_pandas = sdf.select('time_of_day', 'payment_type').toPandas()


# Group by time_of_day and payment_type, then count trips
# Use the pandas DataFrame for grouping
grouped = df_for_pandas.groupby(['time_of_day', 'payment_type']).size().reset_index(name='count')

# Find most common payment type per time of day
most_common = (
    grouped.sort_values('count', ascending=False)
    .groupby('time_of_day')
    .first()
    .reset_index()
)

print(tabulate(most_common, headers='keys', tablefmt='pretty', showindex=False))

+-------------+--------------+--------+
| time_of_day | payment_type | count  |
+-------------+--------------+--------+
|  Afternoon  |      1       | 176046 |
|   Evening   |      1       | 214506 |
|   Morning   |      1       | 183295 |
|    Night    |      1       | 131505 |
+-------------+--------------+--------+


In [None]:
from pyspark.sql.functions import col, sum, count
from tabulate import tabulate

# Group by pickup_borough and summarize existing total_trip_cost
# Use Spark's sum and count functions within the agg method
borough_summary = sdf.groupby('pickup_borough').agg(
    sum(col('total_trip_cost')).alias('total_trip_cost'),
    count(col('pickup_location_id')).alias('trip_count')
)

# Sort the result
borough_summary = borough_summary.orderBy(col('total_trip_cost').desc())

# To display the result using tabulate, convert the Spark DataFrame to pandas first
borough_summary_pd = borough_summary.toPandas()

print(tabulate(borough_summary_pd, headers='keys', tablefmt='pretty', showindex=False))

+----------------+--------------------+------------+
| pickup_borough |  total_trip_cost   | trip_count |
+----------------+--------------------+------------+
|   Manhattan    | 12981746.849999685 |   935775   |
|     Queens     | 2790467.920000127  |   63501    |
|    Brooklyn    | 231210.01000000053 |   12836    |
|     Bronx      | 27487.240000000016 |    1120    |
|      EWR       |      1561.26       |     17     |
| Staten Island  | 801.9399999999999  |     17     |
+----------------+--------------------+------------+


In [None]:
from pyspark.sql.functions import col, sum, count, avg # Import the avg function
from tabulate import tabulate

# Group by passenger count and calculate average tip
# Use PySpark's agg syntax with avg and count functions
tip_by_passenger = sdf.groupby('passenger_count').agg(
    avg(col('tip_amount')).alias('average_tip'), # Use avg function and alias for new column name
    count(col('tip_amount')).alias('trip_count') # Use count function and alias
)

# Sort the result using Spark's orderBy
tip_by_passenger = tip_by_passenger.orderBy(col('passenger_count')) # Use orderBy with col

# To display the result using tabulate, convert the Spark DataFrame to pandas first
tip_by_passenger_pd = tip_by_passenger.toPandas()

print(tabulate(tip_by_passenger_pd, headers='keys', tablefmt='pretty', showindex=False))

+-----------------+--------------------+------------+
| passenger_count |    average_tip     | trip_count |
+-----------------+--------------------+------------+
|       1.0       | 1.8557080878494379 |  726287.0  |
|       2.0       | 1.8459514889241182 |  149739.0  |
|       3.0       | 1.8226370567873544 |  42756.0   |
|       4.0       | 1.700903566447492  |  20076.0   |
|       5.0       | 1.8658671911948164 |  46518.0   |
|       6.0       |  1.85985083724766  |  27889.0   |
|       9.0       |        0.0         |    1.0     |
+-----------------+--------------------+------------+


In [None]:
from pyspark.sql.functions import col, sum, count
from tabulate import tabulate
import pandas as pd

# Trip count per pickup_zone
# Use PySpark groupby and count, then convert to pandas
zone_counts_sdf = sdf.groupby('pickup_zone').agg(count("*").alias('total_trips'))

# Convert the PySpark DataFrame to pandas for easier sorting and filtering
zone_counts = zone_counts_sdf.toPandas()

# Get top 5 zones
top5_zones = zone_counts.sort_values('total_trips', ascending=False).head(5)

# For these zones, find the most common time_of_day
# Filter original sdf for top 5 zones using PySpark where and isin
top5_sdf = sdf.where(col('pickup_zone').isin(top5_zones['pickup_zone'].tolist()))

# Count trips per (zone, time_of_day)
zone_time_counts_sdf = (
    top5_sdf.groupby(['pickup_zone', 'time_of_day'])
    .agg(count("*").alias('trip_count'))
)

# Convert the result to pandas for easier finding of max per group and merging
zone_time_counts = zone_time_counts_sdf.toPandas()


# Get time_of_day with most trips for each zone using pandas operations
idx = (
    zone_time_counts.groupby('pickup_zone')['trip_count']
    .idxmax()
)

# Select the rows corresponding to the maximum trip count for each zone
peak_times_for_zones = zone_time_counts.loc[idx]


# Merge with top5_zones to add best time_of_day using pandas merge
top5_final = top5_zones.merge(
    peak_times_for_zones[['pickup_zone', 'time_of_day', 'trip_count']], # Select relevant columns to merge
    on='pickup_zone',
    how='left' # Use left merge to keep all top 5 zones even if no time_of_day was found (though unlikely here)
).rename(columns={'trip_count': 'peak_time_trips'})

print(tabulate(top5_final, headers='keys', tablefmt='pretty', showindex=False))

+---------------------------+-------------+-------------+-----------------+
|        pickup_zone        | total_trips | time_of_day | peak_time_trips |
+---------------------------+-------------+-------------+-----------------+
|   Upper East Side South   |    42439    |  Afternoon  |      14016      |
|      Midtown Center       |    39664    |   Evening   |      12987      |
|   Upper East Side North   |    38666    |  Afternoon  |      13247      |
|       Midtown East        |    36325    |   Evening   |      12938      |
| Times Sq/Theatre District |    34904    |   Evening   |      11224      |
+---------------------------+-------------+-------------+-----------------+


In [None]:
from pyspark.sql.functions import col
from tabulate import tabulate
import pandas as pd # Ensure pandas is imported if needed for subsequent operations

# Sort sdf by trip duration and get top 5 using PySpark operations
top5_longest_sdf = sdf.orderBy(col('trip_duration_min').desc()).limit(5)

# Convert the result to pandas DataFrame to use pandas methods like rename and reorder
top5_longest = top5_longest_sdf.toPandas()

# Rename columns for clarity (using pandas method on the converted DataFrame)
top5_longest = top5_longest.rename(columns={
    'trip_duration_min': 'Trip Duration (min)',
    'fare_amount': 'Fare ($)',
    'pickup_zone': 'Pickup Zone',
    'dropoff_zone': 'Dropoff Zone',
    'payment_type': 'Payment Type'
})

# Reorder and display clean table (using pandas method)
top5_longest = top5_longest[['Trip Duration (min)', 'Fare ($)', 'Pickup Zone', 'Dropoff Zone', 'Payment Type']]

print(tabulate(top5_longest, headers='keys', tablefmt='pretty', showindex=False))

+---------------------+----------+-----------------+-----------------------+--------------+
| Trip Duration (min) | Fare ($) |   Pickup Zone   |     Dropoff Zone      | Payment Type |
+---------------------+----------+-----------------+-----------------------+--------------+
|       1440.0        |   12.0   |    Gramercy     |    Lower East Side    |      1       |
|       1440.0        |   8.0    | Lenox Hill East |    Lenox Hill East    |      2       |
|       1440.0        |   52.0   |   JFK Airport   | Upper East Side North |      2       |
|       1440.0        |   8.0    |  East Village   |     West Village      |      1       |
|       1440.0        |   7.5    |      SoHo       |     West Village      |      1       |
+---------------------+----------+-----------------+-----------------------+--------------+


Observation: The top 5 trips all have a duration of ~1440 minutes (24 hours), yet their fare amounts are unusually low (ranging from $6 to $20). These are likely data entry or logging errors — possibly caused by incorrect timestamps or meter misconfigurations.

In [None]:
from pyspark.sql.functions import col, sum, count
from tabulate import tabulate
import pandas as pd

# Filter inter-borough trips using PySpark's filter or where
# PySpark DataFrames are immutable, so filtering creates a new DataFrame
df_inter = sdf.filter(col('pickup_borough') != col('dropoff_borough'))

# Group by borough combination and summarize using PySpark's groupBy and agg
top_routes_sdf = df_inter.groupBy(['pickup_borough', 'dropoff_borough']).agg(
    count("*").alias('trip_count'), # Use PySpark count function
    sum(col('total_trip_cost')).alias('total_revenue') # Use PySpark sum function with col
)

# Sort the result using PySpark's orderBy
top_routes_sdf = top_routes_sdf.orderBy(col('trip_count').desc())

# To display the top 10, limit the result and convert to pandas for tabulate
top_routes_pd = top_routes_sdf.limit(10).toPandas()

print(tabulate(top_routes_pd, headers='keys', tablefmt='pretty', showindex=False))

+----------------+-----------------+------------+--------------------+
| pickup_borough | dropoff_borough | trip_count |   total_revenue    |
+----------------+-----------------+------------+--------------------+
|     Queens     |    Manhattan    |   37712    | 1961315.5000000596 |
|   Manhattan    |     Queens      |   31933    | 1343434.1300000318 |
|   Manhattan    |    Brooklyn     |   26812    | 753583.6200000069  |
|     Queens     |    Brooklyn     |    8872    | 408309.7800000004  |
|   Manhattan    |      Bronx      |    4808    | 151443.75999999978 |
|    Brooklyn    |    Manhattan    |    3820    | 94792.24999999996  |
|   Manhattan    |       EWR       |    1866    | 179580.81000000003 |
|     Queens     |      Bronx      |    1063    | 57525.209999999526 |
|    Brooklyn    |     Queens      |    803     |      26781.39      |
|     Bronx      |    Manhattan    |    439     | 11638.900000000005 |
+----------------+-----------------+------------+--------------------+


#SparkMl

In [None]:
from pyspark.sql.functions import col, when, hour, udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, SQLTransformer
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Filter for valid data
sdf = sdf.filter((col("payment_type") == 1) &
                 (col("trip_duration_min") <= 120) &
                 (col("fare_amount") > 0) &
                 (col("tip_amount") >= 0))

# Create high_tip and weights
sdf = sdf.withColumn("high_tip", when(col("tip_amount") > 0.15 * col("fare_amount"), 1).otherwise(0))
sdf = sdf.withColumn("weight", when(col("high_tip") == 1, 3277/18953).otherwise(15676/18953))

# Add features
sdf = sdf.withColumn("fare_per_mile",
                     when(col("trip_distance") > 0, col("fare_amount") / col("trip_distance")).otherwise(0))
# Create the 'hour' column from pickup_datetime
sdf = sdf.withColumn("hour", hour(col("pickup_datetime")))

# Define the time of day mapping UDF
def map_time_of_day(hour):
    if 5 <= hour < 12:
        return 'Morning'
    elif 12 <= hour < 17:
        return 'Afternoon'
    elif 17 <= hour < 22:
        return 'Evening'
    else:
        return 'Night'

map_time_of_day_udf = udf(map_time_of_day, StringType())

# Apply the UDF to create the 'time_of_day' column in the Spark DataFrame
sdf = sdf.withColumn('time_of_day', map_time_of_day_udf(col('hour')))


sdf = sdf.withColumn("trip_speed",
                     when(col("trip_duration_min") > 0, col("trip_distance") / col("trip_duration_min")).otherwise(0))

# Encode categorical variables
indexers = [
    StringIndexer(inputCol="time_of_day", outputCol="time_of_day_index", handleInvalid="keep"),
    StringIndexer(inputCol="pickup_borough", outputCol="pickup_borough_index", handleInvalid="keep"),
    StringIndexer(inputCol="dropoff_borough", outputCol="dropoff_borough_index", handleInvalid="keep")
]

# Interaction feature
sqlTrans = SQLTransformer(statement="SELECT *, fare_amount * hour AS fare_hour_interaction FROM __THIS__")

# Select features
feature_cols = ["passenger_count", "trip_distance", "trip_duration_min", "fare_amount",
                "time_of_day_index", "pickup_borough_index", "dropoff_borough_index",
                "fare_per_mile", "tolls_amount", "hour", "trip_speed", "fare_hour_interaction"]

# Assemble and scale features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features", handleInvalid="skip")
scaler = StandardScaler(inputCol="raw_features", outputCol="features", withStd=True, withMean=True)

# Create pipeline
pipeline = Pipeline(stages=indexers + [sqlTrans, assembler, scaler])

# Apply preprocessing
preprocessed_data = pipeline.fit(sdf).transform(sdf)

# Split data
train_data, test_data = preprocessed_data.randomSplit([0.8, 0.2], seed=42)

# Initialize models
lr = LogisticRegression(labelCol="high_tip", featuresCol="features", weightCol="weight")
dt = DecisionTreeClassifier(labelCol="high_tip", featuresCol="features", maxDepth=7)
rf = RandomForestClassifier(labelCol="high_tip", featuresCol="features", numTrees=50, maxDepth=7, weightCol="weight")
gbt = GBTClassifier(labelCol="high_tip", featuresCol="features", maxDepth=7, maxIter=20)

# Train models
lr_model = lr.fit(train_data)
dt_model = dt.fit(train_data)
rf_model = rf.fit(train_data)
gbt_model = gbt.fit(train_data)

# Make predictions
lr_predictions = lr_model.transform(test_data)
dt_predictions = dt_model.transform(test_data)
rf_predictions = rf_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

# Evaluate models
binary_evaluator = BinaryClassificationEvaluator(labelCol="high_tip", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="high_tip")

print("Model Evaluation Results:")
for name, predictions in [("Logistic Regression", lr_predictions),
                         ("Decision Tree", dt_predictions),
                         ("Random Forest", rf_predictions),
                         ("Gradient Boosted Trees", gbt_predictions)]:
    auc = binary_evaluator.evaluate(predictions)
    multi_evaluator.setMetricName("accuracy")
    accuracy = multi_evaluator.evaluate(predictions)
    multi_evaluator.setMetricName("f1")
    f1_score = multi_evaluator.evaluate(predictions)
    multi_evaluator.setMetricName("weightedPrecision")
    precision = multi_evaluator.evaluate(predictions)
    multi_evaluator.setMetricName("weightedRecall")
    recall = multi_evaluator.evaluate(predictions)
    print(f"{name}:")
    print(f"  AUC: {auc:.4f}")
    print(f"  Accuracy: {accuracy:.4f}")
    print(f"  F1 Score: {f1_score:.4f}")
    print(f"  Precision: {precision:.4f}")
    print(f"  Recall: {recall:.4f}")

# Feature importance for Random Forest
print("\nFeature Importance (Random Forest):")
importances_rf = rf_model.featureImportances
feature_importance_rf = [(feature_cols[i], importances_rf[i]) for i in range(len(feature_cols))]
sorted_importance_rf = sorted(feature_importance_rf, key=lambda x: x[1], reverse=True)
for feature, importance in sorted_importance_rf:
    print(f"  {feature}: {importance:.4f}")

# Feature importance for Gradient Boosted Trees
print("\nFeature Importance (Gradient Boosted Trees):")
importances_gbt = gbt_model.featureImportances
feature_importance_gbt = [(feature_cols[i], importances_gbt[i]) for i in range(len(feature_cols))]
sorted_importance_gbt = sorted(feature_importance_gbt, key=lambda x: x[1], reverse=True)
for feature, importance in sorted_importance_gbt:
    print(f"  {feature}: {importance:.4f}")

In [None]:
sdf.groupBy("high_tip").count().show()

GBT Performed the best out of the 4 models, it has the highest F1 Score and a very high accuracy and precision score as well.