<a href="https://colab.research.google.com/github/AnSaradar/NYC_Taxi_Driver_Analysis/blob/main/NYC_Taxi_Trips_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Setup & Import Dask**

In [72]:
# External dependencies
import os
import shutil
import warnings
import numpy as np
import zipfile
import matplotlib.pyplot as plt
from os import path


warnings.filterwarnings("ignore")

In [73]:
!pip install dask

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# Loading the Data

In [95]:
import dask.dataframe as dd

In [96]:
zip_path = "/content/drive/MyDrive/Datasets/Copy of nyc_taxi_data.zip"
extract_dir = '/content/sample_data'

In [97]:
shutil.unpack_archive(zip_path, extract_dir)


In [98]:
df = dd.read_parquet('/content/sample_data/nyc_taxi_alt/*.parquet')

In [99]:
df.npartitions

609

# **Discovering the Data**

In [None]:
from dask import delayed, compute, visualize, dataframe
from dask.distributed import Client ,LocalCluster

In [None]:
print("Number of rows:", df.shape[0].compute())
print("Number of columns:", len(df.columns))

In [None]:
df.isnull().sum().compute()

In [None]:
df.dtypes

In [None]:
df.airport_fee.value_counts().compute()

In [None]:
df.passenger_count.value_counts().compute()

1.0      126123945
2.0       26414914
3.0        7162781
5.0        5554004
0.0        3482482
6.0        3425107
4.0        3274587
7.0            809
8.0            524
9.0            359
96.0             1
112.0            1
Name: passenger_count, dtype: int64

In [None]:
df.congestion_surcharge.value_counts().compute()

 2.50    145092336
 0.00     14554138
-2.50       452796
 0.75         1195
 2.75          673
 0.50           44
 1.00           25
-0.75            9
 1.50            4
 0.80            2
-1.50            1
 0.30            1
 1.80            1
 2.25            1
 3.00            1
Name: congestion_surcharge, dtype: int64

In [None]:
df.describe().compute()

Unnamed: 0,VendorID,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
count,179311700.0,175439500.0,179311700.0,175439500.0,179311700.0,179311700.0,179311700.0,179311700.0,179311700.0,179311700.0,179311700.0,179311700.0,179311700.0,179311700.0,170651200.0,63550710.0
mean,1.671094,1.492676,4.405538,1.143369,163.7911,161.7755,1.349501,12.62162,1.064549,0.4952468,3.319861,0.4097364,0.302087,19.69599,2.241222,0.08722807
std,0.4698158,1.119885,421.9229,2.890464,65.99441,70.44308,0.7198644,10501.85,37.36028,37.33934,10500.32,1.847462,0.06781555,198.6252,0.7857634,0.3212864
min,1.0,0.0,-37264.53,1.0,1.0,1.0,1.0,-133391400.0,-60.0,-0.55,-493.22,-99.99,-1.0,-2567.8,-2.5,-1.25
25%,2.0,1.0,1.39,1.0,137.0,132.0,1.0,8.5,0.0,0.5,1.0,0.0,0.3,12.8,2.5,0.0
50%,2.0,1.0,2.53,1.0,163.0,163.0,1.0,12.5,1.0,0.5,2.34,0.0,0.3,17.16,2.5,0.0
75%,2.0,2.0,6.42,1.0,236.0,236.0,4.0,26.1,2.75,0.5,4.82,0.0,0.3,34.44,2.5,0.0
max,2.0,112.0,389678.5,99.0,265.0,265.0,5.0,998310.0,500000.8,500000.5,133391400.0,3288.0,1.0,1084772.0,4.5,1.25


In [None]:
df.payment_type.value_counts().compute()

1    122751631
2     40466949
3       758470
4       545424
5           17
Name: payment_type, dtype: int64

# Cleaning the Missing & Outlier Values 

## Passanger Count

In [100]:
cluster = LocalCluster()
client = Client(cluster)

df = df.map_partitions(lambda partition: partition.loc[partition['passenger_count'] <= 4])


client.close()
cluster.close()

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:39845
INFO:distributed.scheduler:  dashboard at:           127.0.0.1:33019
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:39467'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38447'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:44537', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:44537
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:40698
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:37699', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:37699
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:40706
INFO:distributed.scheduler:Receive client connection: Client-25a4edc0-0876-11ee-86c6-0242ac1c000c
INF

## Trip Distance

In [101]:
cluster = LocalCluster()
client = Client(cluster)

df = df.map_partitions(lambda partition: partition.loc[partition['trip_distance'] > 0])

client.close()
cluster.close()

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:45655
INFO:distributed.scheduler:  dashboard at:           127.0.0.1:42503
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:43529'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:42441'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:41089', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:41089
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:38420
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:38137', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:38137
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:38436
INFO:distributed.scheduler:Receive client connection: Client-26a9997f-0876-11ee-86c6-0242ac1c000c
INF

## Airport Fee

In [102]:


df = df.loc[~df['airport_fee'].isin([0.50, -1.25])]
df['airport_fee'] = df['airport_fee'].fillna(0.0)




## RatecodeID & store_and_fwd_flag

In [103]:


df = df.dropna(subset=['RatecodeID', 'store_and_fwd_flag'])




##Congestion Surcharge

In [104]:


df = df.loc[~df['congestion_surcharge'].isin([-2.50, 0.50,1.00 ,-0.75 , 1.50  ,0.80  , -1.50  ,0.30  , 1.80 ,2.25  ,           3.00 ])]
df['congestion_surcharge'] = df['congestion_surcharge'].fillna(2.50)



## Fare Amount

In [105]:
cluster = LocalCluster()
client = Client(cluster)

df = df.map_partitions(lambda partition: partition.loc[partition['fare_amount'] > 0])


client.close()
cluster.close()

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:36867
INFO:distributed.scheduler:  dashboard at:           127.0.0.1:35261
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:45219'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:44181'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:39293', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:39293
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:39410
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:37509', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:37509
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:39414
INFO:distributed.scheduler:Receive client connection: Client-27b22ed6-0876-11ee-86c6-0242ac1c000c
INF

## Extra

In [106]:
cluster = LocalCluster()
client = Client(cluster)

df = df.map_partitions(lambda partition: partition.loc[partition['extra'] >= 0])


client.close()
cluster.close()

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:38709
INFO:distributed.scheduler:  dashboard at:           127.0.0.1:43003
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:45009'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:45775'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:33739', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:33739
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:46250
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:46707', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:46707
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:46264
INFO:distributed.scheduler:Receive client connection: Client-28a13549-0876-11ee-86c6-0242ac1c000c
INF

##MTA Tax

In [107]:
cluster = LocalCluster()
client = Client(cluster)


df = df.map_partitions(lambda partition: partition.loc[partition['mta_tax'] >= 0])


client.close()
cluster.close()

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:41773
INFO:distributed.scheduler:  dashboard at:           127.0.0.1:43645
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:44227'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38247'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:37969', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:37969
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:35416
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:35699', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:35699
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:35424
INFO:distributed.scheduler:Receive client connection: Client-2a2c2fc6-0876-11ee-86c6-0242ac1c000c
INF

##Improvement Surcharge

In [108]:
cluster = LocalCluster()
client = Client(cluster)

df = df.map_partitions(lambda partition: partition.loc[partition['improvement_surcharge'] >= 0])

client.close()
cluster.close()

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:44995
INFO:distributed.scheduler:  dashboard at:           127.0.0.1:34317
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:46695'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38067'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:41371', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:41371
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:33068
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:37371', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:37371
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:33078
INFO:distributed.scheduler:Receive client connection: Client-2b5c851c-0876-11ee-86c6-0242ac1c000c
INF

## Tip Amount

In [109]:
cluster = LocalCluster()
client = Client(cluster)

df = df.map_partitions(lambda partition: partition.loc[partition['tip_amount'] >= 0])


client.close()
cluster.close()

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:39733
INFO:distributed.scheduler:  dashboard at:           127.0.0.1:39595
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:34067'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:32989'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:36461', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:36461
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:46508
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:46545', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:46545
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:46498
INFO:distributed.scheduler:Receive client connection: Client-2c44b95c-0876-11ee-86c6-0242ac1c000c
INF

## Tolls Amount

In [110]:
cluster = LocalCluster()
client = Client(cluster)

df = df.map_partitions(lambda partition: partition.loc[partition['tolls_amount'] >= 0])


client.close()
cluster.close()

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:35037
INFO:distributed.scheduler:  dashboard at:           127.0.0.1:44981
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:43773'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:40731'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:40903', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:40903
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:44366
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:44547', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:44547
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:44376
INFO:distributed.scheduler:Receive client connection: Client-2d2c1e16-0876-11ee-86c6-0242ac1c000c
INF

##Total Amount

In [111]:
cluster = LocalCluster()
client = Client(cluster)

df = df.map_partitions(lambda partition: partition.loc[partition['total_amount'] > 0])


client.close()
cluster.close()

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:40115
INFO:distributed.scheduler:  dashboard at:           127.0.0.1:45641
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:40959'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:41483'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:43205', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:43205
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:33944
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:44893', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:44893
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:33946
INFO:distributed.scheduler:Receive client connection: Client-2e1a055d-0876-11ee-86c6-0242ac1c000c
INF

## Computing all the Cleaning Operations

In [112]:
cluster = LocalCluster()
client = Client(cluster)

df.compute()

client.close()
cluster.close()

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:38583
INFO:distributed.scheduler:  dashboard at:           127.0.0.1:35759
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:39847'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:44631'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:44965', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:44965
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:46762
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:34491', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:34491
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:46774
INFO:distributed.scheduler:Receive client connection: Client-2ef2311a-0876-11ee-86c6-0242ac1c000c
INF

Exception: ignored

## Cleaning Results

In [None]:
cluster = LocalCluster()
client = Client(cluster)

print("Number of rows , After Cleaning:", df.shape[0].compute())
client.close()
cluster.close()



INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:35225
INFO:distributed.scheduler:  dashboard at:           127.0.0.1:41523
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:42511'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:46457'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:46351', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:46351
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:58178
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:36559', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:36559
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:58192
INFO:distributed.scheduler:Receive client connection: Client-20c12217-0855-11ee-86c6-0242ac1c000c
INF

Number of rows , After Cleaning: 163966708


INFO:distributed.scheduler:Remove client Client-20c12217-0855-11ee-86c6-0242ac1c000c
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:58198; closing.
INFO:distributed.scheduler:Remove client Client-20c12217-0855-11ee-86c6-0242ac1c000c
INFO:distributed.scheduler:Close client connection: Client-20c12217-0855-11ee-86c6-0242ac1c000c
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:42511'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:46457'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:58178; closing.
INFO:distributed.scheduler:Remove worker <WorkerState 'tcp://127.0.0.1:46351', name: 1, status: closing, memory: 0, processing: 0>
INFO:distributed.core:Removing comms to tcp://127.0.0.1:46351
INFO:distributed.core:Received 'close-stream' from tcp://12

In [None]:
cluster = LocalCluster()
client = Client(cluster)

df.to_parquet('/content/drive/MyDrive/Datasets/NYC_Taxi/cleaned_data.parquet', engine='pyarrow')

client.close()
cluster.close()

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:37325
INFO:distributed.scheduler:  dashboard at:           127.0.0.1:40595
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:35315'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:41737'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:40027', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:40027
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:36266
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:38837', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:38837
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:36258
INFO:distributed.scheduler:Receive client connection: Client-f4aef913-0855-11ee-86c6-0242ac1c000c
INF

# Building A Time Series

In [None]:
df['pickup_timestamp'] = dd.to_datetime(df['tpep_pickup_datetime'])
df['dropoff_timestamp'] = dd.to_datetime(df['tpep_dropoff_datetime'])
cluster = LocalCluster()
client = Client()
df = df.map_partitions(lambda partition: partition.set_index(['pickup_timestamp']))
cluster.close()
client.close()

## Saving the Time Series on the disk

In [None]:
df.to_parquet('/content/drive/MyDrive/Datasets/NYC_Taxi/cleaned_ts', engine='pyarrow')


## Loading The data from the disk 

In [None]:
df = dd.read_parquet('/content/drive/MyDrive/Datasets/NYC_Taxi/cleaned_data.parquet*.parquet', engine='pyarrow')

#Feature Engineering

In [None]:
import pandas as pd
locations_df = pd.read_csv('/content/drive/MyDrive/Datasets/Copy of taxi_zone_lookup.csv')

In [None]:
def map_zones(partition,column):
    # Access the Pandas DataFrame within each partition
    local_df = partition[0]
    # Perform the join operation to get the 'zone' values
    merged_df = local_df.merge(locations_df, left_on=column,right_on = 'LocationID', how='left')
    return merged_df[['Zone','Borough','service_zone']]

## Source Zone && Source Borough && Source Service Zone

In [None]:
cluster = LocalCluster()
client = Client(cluster)

df[['Source_Zone','Source_Borough','Source_Service_Zone']] = df.map_partitions(lambda partition: map_zones(partition,'PULocationID'), meta=({'Source_Zone':'object','Source_Borough':'object','Source_Service_Zone':'object'}))



cluster.close()
client.close()


## Destionation Zone && Destionation Borough && Destionation Service Zone

In [None]:
cluster = LocalCluster()
client = Client(cluster)

df[['Destination_Zone','Destination_Borough','Destination_Service_Zone']] = df.map_partitions(lambda partition: map_zones(partition,'DOLocationID'), meta=({'Destination_Zone':'object','Destination_Borough':'object','Destination_Service_Zone':'object'}))


cluster.close()
client.close()

## Location Pair

In [None]:
df['Location_Pair'] = df['Source Zone'].astype(str) + ', ' + df['Destination Zone'].astype(str)

df = df.sort_values('Location_Pair')

## Payment Name Type


In [None]:
payment_mapping = {
    1: 'Credit card',
    2: 'Cash',
    3: 'No charge',
    4: 'Dispute',
    5: 'Unknown',
    6: 'Voided trip'
}

def map_payment_name(partition):
  
    payment_method = partition['payment_type']
    payment_names = payment_method.map(payment_mapping)
    
    return payment_names

In [None]:
cluster = LocalCluster()
client = Client(cluster)

df['payment_name'] = df.map_partitions(map_payment_name, meta=('payment_type_name', 'object'))

cluster.close()
client.close()

##Vendor

In [None]:
vendor_mapping = {
    1: 'Creative Mobile Technologies, LLC',
    2: 'VeriFone Inc',
  
}

def map_vendor_name(partition):
  
    vendor_id = partition['VendorID']
    vendor_names = vendor_id.map(payment_mapping)
    
    return vendor_names

In [None]:
cluster = LocalCluster()
client = Client(cluster)

df['VendorID'] = df.map_partitions(map_vendor_name, meta=('Vendor_Name', 'object'))

cluster.close()
client.close()

## Trip Class

In [None]:
cluster = LocalCluster()
client = Client(cluster)

top_20_location_pairs = df['location_pair'].value_counts().nlargest(20)
top_20_location_pairs = top_20_location_pairs.compute()

cluster.close()
client.close()

## Trip Duration

In [None]:
def compute_trip_duration(pickup_datetime, dropoff_datetime):
    return (dropoff_datetime - pickup_datetime).dt.total_seconds()/60

In [None]:
cluster = LocalCluster()

client = Client(cluster)


df['trip_duration'] = df.map_partitions(
    lambda partition: compute_trip_duration(partition['tpep_pickup_datetime'], partition['tpep_dropoff_datetime']),
    meta=('trip_duration', 'float64')
)


client.close()
cluster.close()

## Trip Distance

In [None]:
conversion_factor = 1.60934

cluster = LocalCluster()
client = Client(cluster)

df['trip_distance_km'] = df['trip_distance'] * conversion_factor

client.close()
cluster.close()
