In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import isnan, when, count, col, lit, udf
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd 
import numpy as np 
import re
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from geopy import distance
import pickle

spark = SparkSession.builder.appName("tripDataCSVLoad").getOrCreate()

# Prepare Schema of data and read CSV

In [4]:
def format_schema_expression(path,samp_nrows=100):
    int_types = [np.int64,int]
    float_types = [np.float64,float]

    df = pd.read_csv(path,nrows=samp_nrows)
    cols_og = [i for i in df.columns.values]
    cols_edited = [re.sub('[^a-zA-Z0-9_.]', '',i) for i in cols_og]

    schema_list = []
    col_list = []
    for col in cols_og:
        if df[col].dtype in int_types:
            schema_str = str(col)+' int'
        elif df[col].dtype in float_types:
            schema_str = str(col)+' double'
        else:
            schema_str = str(col)+' string'
        col_str = str(col) +' as '+re.sub('[^a-zA-Z0-9_.]', '',col)
        schema_list.append(schema_str)
        col_list.append(col_str)
    schema_statement = ','.join(schema_list)
    col_convert_dict = dict(list(zip(cols_og,cols_edited)))

    return col_convert_dict,schema_statement

tripdata_cols_dict,tripdata_schema = format_schema_expression("src/trip_data_4.csv")
tripfare_cols_dict,tripfare_schema = format_schema_expression("src/trip_fare_4.csv")

In [5]:
df_tripdata=spark.read.csv("src/trip_data_4.csv",
                            header=True,
                            inferSchema=True,
                            quote='"',
                            schema=tripdata_schema)
for key,val in tripdata_cols_dict.items():
    df_tripdata = df_tripdata.withColumnRenamed(key,val)
# print(df_tripdata.printSchema())

df_tripfare=spark.read.csv("src/trip_fare_4.csv",
                            header=True,
                            inferSchema=True,
                            quote='"',
                            schema=tripfare_schema)
for key,val in tripfare_cols_dict.items():
    df_tripfare = df_tripfare.withColumnRenamed(key,val)
# print(df_tripfare.printSchema())

In [None]:
# print(df_tripdata.schema.names)
# print(df_tripfare.schema.names)

['medallion', 'hack_license', 'vendor_id', 'rate_code', 'store_and_fwd_flag', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'trip_time_in_secs', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']
['medallion', 'hack_license', 'vendor_id', 'pickup_datetime', 'payment_type', 'fare_amount', 'surcharge', 'mta_tax', 'tip_amount', 'tolls_amount', 'total_amount']


## Data Sanity Check and Cleasing
#### A) tripdata
##### 1) null values
##### 2) pickup / dropoff datetime scope
##### 3) passenger count
##### 4) trip time
##### 5) trip distance
##### 6) pickup / dropoff latitude/longitude scope
##### 7) pickup / dropoff latitude/longitude is identical
#### B) tripfare
##### 1) null values
##### 2) fare amount
##### 3) toll amount
##### 4) surcharge
##### 5) tip amount
##### 6) mta_tax
##### 7) total amount

In [6]:
### Tripdata

## 1) null values
df_tripdata.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_tripdata.columns]).show()

#---------------------------------------------------------------------------------------------------------
#1. store_and_fwd_flag show highest null - 7518657. No need to remove as it has no meaningful implication
#2. missing dropoff_longiture/ latitude - 146. To be removed
#---------------------------------------------------------------------------------------------------------
print('finished 1')
# 2) pickup / dropoff datetime scope

quantiles_pickup_datetime = df_tripdata.withColumn('pickup_datetime',F.to_timestamp('pickup_datetime'))\
    .select(F.percentile_approx("pickup_datetime", [0.0,0.01,0.25, 0.5, 0.75,0.99,1.0], 1000000).alias("quantiles")).collect()

# pickup_datetime
# [datetime.datetime(2013, 4, 1, 0, 0), datetime.datetime(2013, 4, 1, 12, 15, 42), 
# datetime.datetime(2013, 4, 8, 16, 38), datetime.datetime(2013, 4, 16, 1, 56), 
# datetime.datetime(2013, 4, 23, 14, 10, 32), datetime.datetime(2013, 4, 30, 18, 51), 
# datetime.datetime(2013, 4, 30, 23, 59, 58)]

quantiles_dropoff_datetime = df_tripdata.withColumn('dropoff_datetime',F.to_timestamp('dropoff_datetime'))\
    .select(F.percentile_approx("dropoff_datetime", [0.0,0.01,0.25, 0.5, 0.75,0.99,1.0], 1000000).alias("quantiles")).collect()
print('finished 2')
# dropoff_datetime
# [datetime.datetime(2013, 4, 1, 0, 0), datetime.datetime(2013, 4, 1, 12, 27, 2), 
# datetime.datetime(2013, 4, 8, 16, 51), datetime.datetime(2013, 4, 16, 2, 6), 
# datetime.datetime(2013, 4, 23, 14, 24), datetime.datetime(2013, 4, 30, 19, 3), 
# datetime.datetime(2013, 5, 1, 2, 19, 25)]

#---------------------------------------------------------------------------------------------------------
#1. No suspicious pickup datetime
#2. No suspicious dropoff datetime
#---------------------------------------------------------------------------------------------------------

# 3) passenger count

quantiles_passenger = df_tripdata\
    .select(F.percentile_approx("passenger_count", [0.0,0.01,0.25, 0.5, 0.75,0.99,1.0], 1000000).alias("quantiles")).collect()
print('finished 3')
# passenger count
# [Row(quantiles=[0, 1, 1, 1, 2, 6, 9])]
#---------------------------------------------------------------------------------------------------------
#1. Remove passenger count == 0
#---------------------------------------------------------------------------------------------------------

##### 4) trip time

quantiles_trip_time = df_tripdata\
    .select(F.percentile_approx("trip_time_in_secs", [0.0,0.01,0.25, 0.5, 0.75,0.99,1.0], 1000000).alias("quantiles")).collect()
print('finished 4')
# trip time
# [Row(quantiles=[0, 60, 360, 600, 960, 2760, 10800])]
#---------------------------------------------------------------------------------------------------------
#1. Remove trip_time_in_secs == 0
#---------------------------------------------------------------------------------------------------------

##### 5) trip distance

quantiles_trip_distance = df_tripdata\
    .select(F.percentile_approx("trip_distance", [0.0,0.01,0.25, 0.5, 0.75,0.99,1.0], 1000000).alias("quantiles")).collect()
print('finished 5')
# trip distance
# [Row(quantiles=[0.0, 0.1, 1.04, 1.78, 3.2, 18.1, 100.0])]
#---------------------------------------------------------------------------------------------------------
#1. Remove trip_distance == 0
#---------------------------------------------------------------------------------------------------------

##### 6) pickup / dropoff latitude/longitude scope

total_rows = df_tripdata.count()

# df_tripdata.select('*',F.round(col('pickup_latitude')).alias('rounded_pickup_latitude'))\
#     .groupBy('rounded_pickup_latitude').count()\
#         .select("*",(col("count")/total_rows).alias('percentage'))\
#             .filter("`percentage` >= 0.01").sort(F.desc('percentage')).show()

# pickup_latitude - remove 0 latitude as it is invalid
# +-----------------------+--------+--------------------+
# |rounded_pickup_latitude|   count|          percentage|
# +-----------------------+--------+--------------------+
# |                   41.0|14848448|  0.9833104510403253|
# |                    0.0|  250672|0.016600280203236086|
# +-----------------------+--------+--------------------+

valid_pickup_latitude = df_tripdata.select('*',F.round(col('pickup_latitude')).alias('rounded_pickup_latitude'))\
    .groupBy('rounded_pickup_latitude').count()\
        .select("*",(col("count")/total_rows).alias('percentage'))\
            .filter("`percentage` >= 0.01 and rounded_pickup_latitude <> 0")\
            .sort(F.desc('percentage')).select('rounded_pickup_latitude').collect()

valid_pickup_latitude_list = [int(row.rounded_pickup_latitude) for row in valid_pickup_latitude]

# df_tripdata.select('*',F.round(col('dropoff_latitude')).alias('rounded_dropoff_latitude'))\
#     .groupBy('rounded_dropoff_latitude').count()\
#         .select("*",(col("count")/total_rows).alias('percentage'))\
#             .filter("`percentage` >= 0.01").sort(F.desc('percentage')).show()

# dropoff_latitude - remove 0 latitude as it is invalid
# +------------------------+--------+--------------------+
# |rounded_dropoff_latitude|   count|          percentage|
# +------------------------+--------+--------------------+
# |                    41.0|14839861|  0.9827417931682647|
# |                     0.0|  259005|0.017152117404573158|
# +------------------------+--------+--------------------+

valid_dropoff_latitude = df_tripdata.select('*',F.round(col('dropoff_latitude')).alias('rounded_dropoff_latitude'))\
    .groupBy('rounded_dropoff_latitude').count()\
        .select("*",(col("count")/total_rows).alias('percentage'))\
            .filter("`percentage` >= 0.01 and rounded_dropoff_latitude <> 0")\
            .sort(F.desc('percentage')).select('rounded_dropoff_latitude').collect()

valid_dropoff_latitude_list = [int(row.rounded_dropoff_latitude) for row in valid_dropoff_latitude]

# df_tripdata.select('*',F.round(col('pickup_longitude')).alias('rounded_pickup_longitude'))\
#     .groupBy('rounded_pickup_longitude').count()\
#         .select("*",(col("count")/total_rows).alias('percentage'))\
#             .filter("`percentage` >= 0.01").sort(F.desc('percentage')).show()

# pickup_longitude - remove 0 latitude as it is invalid
# +------------------------+--------+--------------------+
# |rounded_pickup_longitude|   count|          percentage|
# +------------------------+--------+--------------------+
# |                   -74.0|14845693|  0.9831280063637763|
# |                     0.0|  253233|0.016769877595846697|
# +------------------------+--------+--------------------+

valid_pickup_longitude = df_tripdata.select('*',F.round(col('pickup_longitude')).alias('rounded_pickup_longitude'))\
    .groupBy('rounded_pickup_longitude').count()\
        .select("*",(col("count")/total_rows).alias('percentage'))\
            .filter("`percentage` >= 0.01 and rounded_pickup_longitude <> 0")\
            .sort(F.desc('percentage')).select('rounded_pickup_longitude').collect()

valid_pickup_longitude_list = [int(row.rounded_pickup_longitude) for row in valid_pickup_longitude]

# df_tripdata.select('*',F.round(col('dropoff_longitude')).alias('rounded_dropoff_longitude'))\
#     .groupBy('rounded_dropoff_longitude').count()\
#         .select("*",(col("count")/total_rows).alias('percentage'))\
#             .filter("`percentage` >= 0.01").sort(F.desc('percentage')).show()

# dropoff_longitude - remove 0 latitude as it is invalid
# +-------------------------+--------+-------------------+
# |rounded_dropoff_longitude|   count|         percentage|
# +-------------------------+--------+-------------------+
# |                    -74.0|14836719| 0.9825337201469517|
# |                      0.0|  261508|0.01731787385662484|
# +-------------------------+--------+-------------------+

valid_dropoff_longitude = df_tripdata.select('*',F.round(col('dropoff_longitude')).alias('rounded_dropoff_longitude'))\
    .groupBy('rounded_dropoff_longitude').count()\
        .select("*",(col("count")/total_rows).alias('percentage'))\
            .filter("`percentage` >= 0.01 and rounded_dropoff_longitude <> 0")\
            .sort(F.desc('percentage')).select('rounded_dropoff_longitude').collect()

valid_dropoff_longitude_list = [int(row.rounded_dropoff_longitude) for row in valid_dropoff_longitude]
print('finished 6')
#---------------------------------------------------------------------------------------------------------
#1. Keep only rounded pickup_latitude in valid_pickup_latitude
#2. Keep only rounded pickup_latitude in valid_pickup_latitude
#3. Keep only rounded pickup_latitude in valid_dropoff_longitude
#4. Keep only rounded pickup_latitude in valid_dropoff_longitude
#---------------------------------------------------------------------------------------------------------


##### 7) pickup / dropoff latitude/longitude is identical

## Some pickup_lat/long and exactly the same as dropoff_lat/long resulting in 0 geo_distance and null fare_per_mile
## remove such records - 116577 rows

df_tripdata.filter((col('pickup_latitude')==col('dropoff_latitude'))&(col('pickup_longitude')==col('dropoff_longitude'))),count()

+---------+------------+---------+---------+------------------+---------------+----------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|medallion|hack_license|vendor_id|rate_code|store_and_fwd_flag|pickup_datetime|dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+---------+------------+---------+---------+------------------+---------------+----------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|        0|           0|        0|        0|           7518657|              0|               0|              0|                0|            0|               0|              0|              146|             146|
+---------+------------+---------+---------+------------------+---------------+----------------+---------------+-----------------+-------------+----

NameError: name 'df_trip' is not defined

In [7]:
#---------------------------------------------------------------------------------------------------------
# Data Cleansing - df_tripdata
#1. missing dropoff_longiture/ latitude - 146. To be removed
#2. Remove passenger count == 0
#3. Remove trip_time_in_secs == 0
#4. Remove trip_distance == 0
#5. Keep only rounded pickup_latitude in valid pickup/dropoff latitude/longitude
#6. Remove identical pickup and dropoff
#---------------------------------------------------------------------------------------------------------

print(f"rows before clesing: {df_tripdata.count()}")
df_tripdata = df_tripdata.filter(col('dropoff_latitude').isNotNull())\
    .filter(col('dropoff_latitude').isNotNull())
print(f"rows after removing missing dropoff coordinates: {df_tripdata.count()}")
df_tripdata = df_tripdata.filter(col('passenger_count')>0)
print(f"rows after removing 0 passenger trips: {df_tripdata.count()}")
df_tripdata = df_tripdata.filter(col('trip_time_in_secs')>0)
print(f"rows after removing 0 time trips: {df_tripdata.count()}")
df_tripdata = df_tripdata.filter(col('trip_distance')>0)
print(f"rows after removing 0 distance trips: {df_tripdata.count()}")
df_tripdata = df_tripdata.select('*',
F.round(col('pickup_latitude')).alias('rounded_pickup_latitude'),
F.round(col('pickup_longitude')).alias('rounded_pickup_longitude'),
F.round(col('dropoff_latitude')).alias('rounded_dropoff_latitude'),
F.round(col('dropoff_longitude')).alias('rounded_dropoff_longitude'))\
    .filter(col('rounded_pickup_latitude').isin(valid_pickup_latitude_list))\
        .filter(col('rounded_pickup_longitude').isin(valid_pickup_longitude_list))\
            .filter(col('rounded_dropoff_latitude').isin(valid_dropoff_latitude_list))\
                .filter(col('rounded_dropoff_longitude').isin(valid_dropoff_longitude_list))\
                    .drop(*['rounded_pickup_latitude','rounded_pickup_longitude',
                            'rounded_dropoff_latitude','rounded_dropoff_longitude'])
print(f"rows after removing suspicious pickup/dropoff latitude/longitude: {df_tripdata.count()}")
df_tripdata = df_tripdata.filter((col('pickup_latitude')!=col('dropoff_latitude'))|(col('pickup_longitude')!=col('dropoff_longitude')))
print(f"rows after removing identical pickup/dropoff: {df_tripdata.count()}")

# rows before clesing: 15100468
# rows after removing missing dropoff coordinates: 15100322
# rows after removing 0 passenger trips: 15100239
# rows after removing 0 time trips: 15062346
# rows after removing 0 distance trips: 14976785
# rows after removing suspicious pickup/dropoff latitude/longitue: 14734920

rows before clesing: 15100468
rows after removing missing dropoff coordinates: 15100322
rows after removing 0 passenger trips: 15100239
rows after removing 0 time trips: 15062346
rows after removing 0 distance trips: 14976785
rows after removing suspicious pickup/dropoff latitude/longitude: 14734920
rows after removing identical pickup/dropoff: 14617675


In [7]:
##### 1) null values
##### 2) fare amount
##### 3) toll amount
##### 4) surcharge
##### 5) tip amount
##### 6) mta_tax
##### 7) total amount

In [8]:
### Tripfare

## 1) null values
df_tripfare.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_tripfare.columns]).show()

#---------------------------------------------------------------------------------------------------------
#1. No missing values at all
#---------------------------------------------------------------------------------------------------------

# 2) trip fare, tolls amount, surcharge, tip amount, mta_tax, total amount

quantiles_fare_amount = df_tripfare\
    .select(F.percentile_approx("fare_amount", [0.0,0.01,0.25, 0.5, 0.75,0.99,0.995,1.0], 1000000).alias("quantiles")).collect()

quantiles_tolls_amount = df_tripfare\
    .select(F.percentile_approx("tolls_amount", [0.0,0.01,0.25, 0.5, 0.75,0.99,0.995,1.0], 1000000).alias("quantiles")).collect()

quantiles_surcharge = df_tripfare\
    .select(F.percentile_approx("surcharge", [0.0,0.01,0.25, 0.5, 0.75,0.99,0.995,1.0], 1000000).alias("quantiles")).collect()

quantiles_tip_amount = df_tripfare\
    .select(F.percentile_approx("tip_amount", [0.0,0.01,0.25, 0.5, 0.75,0.99,0.995,1.0], 1000000).alias("quantiles")).collect()

quantiles_mta_tx = df_tripfare\
    .select(F.percentile_approx("mta_tax", [0.0,0.01,0.25, 0.5, 0.75,0.99,0.995,1.0], 1000000).alias("quantiles")).collect()

quantiles_total_amount = df_tripfare\
    .select(F.percentile_approx("total_amount", [0.0,0.01,0.25, 0.5, 0.75,0.99,0.995,1.0], 1000000).alias("quantiles")).collect()

# quantiles_fare_amount:[Row(quantiles=[2.5, 3.5, 6.5, 9.5, 14.0, 52.0, 52.0, 500.0])]
### minimum fare from websites confirm $2.5 minimum fare
# quantiles_tolls_amount:[Row(quantiles=[0.0, 0.0, 0.0, 0.0, 0.0, 5.33, 5.33, 20.0])]
# quantiles_surcharge:[Row(quantiles=[0.0, 0.0, 0.0, 0.0, 0.5, 1.0, 1.0, 15.0])]
# quantiles_tip_amount:[Row(quantiles=[0.0, 0.0, 0.0, 1.0, 2.0, 10.4, 11.56, 200.0])]
# quantiles_total_amount:[Row(quantiles=[2.5, 4.0, 8.0, 11.0, 16.5, 65.62, 69.39, 628.1])]

# cutoff at 99.5 percentile
cutoff_fare_amount = quantiles_fare_amount[0].quantiles[-2]
cutoff_tolls_amount = quantiles_tolls_amount[0].quantiles[-2]
cutoff_surcharge = quantiles_surcharge[0].quantiles[-2]
cutoff_tip_amount = quantiles_tip_amount[0].quantiles[-2]
cutoff_total_amount = quantiles_total_amount[0].quantiles[-2]

print(f"cutoff_fare_amount:{cutoff_fare_amount}")
print(f"cutoff_tolls_amount:{cutoff_tolls_amount}")
print(f"cutoff_surcharge:{cutoff_surcharge}")
print(f"cutoff_tip_amount:{cutoff_tip_amount}")
print(f"cutoff_total_amount:{cutoff_total_amount}")

#---------------------------------------------------------------------------------------------------------
#1. Cutoff 5 amount at 99.5 percentile
#---------------------------------------------------------------------------------------------------------

+---------+------------+---------+---------------+------------+-----------+---------+-------+----------+------------+------------+
|medallion|hack_license|vendor_id|pickup_datetime|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+---------+------------+---------+---------------+------------+-----------+---------+-------+----------+------------+------------+
|        0|           0|        0|              0|           0|          0|        0|      0|         0|           0|           0|
+---------+------------+---------+---------------+------------+-----------+---------+-------+----------+------------+------------+

cutoff_fare_amount:52.0
cutoff_tolls_amount:5.33
cutoff_surcharge:1.0
cutoff_tip_amount:11.56
cutoff_total_amount:69.39


In [9]:
#---------------------------------------------------------------------------------------------------------
# Data Cleansing - df_tripfare
#1. mCutoff 5 amount at 99.5 percentile
#---------------------------------------------------------------------------------------------------------

print(f"rows before clesing: {df_tripfare.count()}")
df_tripfare = df_tripfare\
    .filter(col('fare_amount')<=cutoff_fare_amount)\
        .filter(col('tolls_amount')<=cutoff_tolls_amount)\
            .filter(col('surcharge')<=cutoff_surcharge)\
                .filter(col('tip_amount')<=cutoff_tip_amount)\
                    .filter(col('total_amount')<=cutoff_total_amount)
print(f"rows after removing extreme amounts above 99.5 percentile: {df_tripfare.count()}")

rows before clesing: 15100468
rows after removing extreme amounts above 99.5 percentile: 14971573


## Enhance latitude longitude data in tripdata for descriptive analytics
### get suburb names of pickup and dropoff
### categorize trip from pickup-dropoff locations

In [10]:
## Collect coorditnate from unique 2 decimal-rounded coordinates (1.1km error) from both pickup and dropoff
## Call reverse geocode API to get suburb (or county) and city (or state)
## Save unique coordinates - area description dictionary to pickle file

geolocator = Nominatim(user_agent="bikeshare")
reverse = RateLimiter(geolocator.reverse, min_delay_seconds=1, max_retries=0)

pickup_latlong_list = df_tripdata.withColumn("rounded_lat",F.round("pickup_latitude", 2))\
    .withColumn("rounded_long",F.round("pickup_longitude", 2))\
    .groupBy('rounded_lat','rounded_long').count().sort(F.desc("count")).toPandas()\
        .apply(lambda row: ', '.join([str(row['rounded_lat']),str(row['rounded_long'])]),axis=1).tolist()

dropoff_latlong_list = df_tripdata.withColumn("rounded_lat",F.round("dropoff_latitude", 2))\
    .withColumn("rounded_long",F.round("dropoff_longitude", 2))\
    .groupBy('rounded_lat','rounded_long').count().sort(F.desc("count")).toPandas()\
        .apply(lambda row: ', '.join([str(row['rounded_lat']),str(row['rounded_long'])]),axis=1).tolist()

latlong_list = list(set.union(set(pickup_latlong_list),set(dropoff_latlong_list)))
print(len(latlong_list))

def find_address(latlong):
  try:
    return reverse(latlong).raw['address']
  except:
    return 'unknown'

latlong_dict = {}
for item in latlong_list:
  latlong_dict[item] = find_address(item)

file_to_write = open("latlong_address.pickle", "wb")
pickle.dump(latlong_dict, file_to_write)

In [12]:
### Create new columns - pickup area and dropoff area

file_to_read = open("latlong_address.pickle", "rb")
latlong_dict = pickle.load(file_to_read)

def map_area(latlong_str):
    ''' get rounded latitude and longitude and return area name'''
    if latlong_str in latlong_dict.keys():
        address = latlong_dict[latlong_str]
        if address == 'unknown':
            return 'unknown'
        elif ('suburb' in address.keys()) and ('city' in address.keys()):
            return ', '.join([address['suburb'],address['city']])
        elif ('suburb' in address.keys()) and ('state' in address.keys()):
            return ', '.join([address['suburb'],address['state']])
        elif ('city_district' in address.keys()) and ('city' in address.keys()):
            return ', '.join([address['city_district'],address['city']])
        elif 'county' in address.keys():
            return ', '.join([address['county'],address['state']])
        else:
            return 'unknown'
    else:
        return 'unknown'

udf_map_area = udf(map_area,StringType())

df_tripdata = df_tripdata.withColumn('pickup_area',
udf_map_area(F.concat(F.round(col('pickup_latitude'),2), lit(', '), F.round(col('pickup_longitude'),2))))\
    .withColumn('dropoff_area',
udf_map_area(F.concat(F.round(col('dropoff_latitude'),2), lit(', '), F.round(col('dropoff_longitude'),2))))

In [34]:
### create new column - geo_distance

def calculate_geo_distance(lat1,long1,lat2,long2):
    ''' get geo distance between 2 coordinates'''

    coord1 = (lat1,long1)
    coord2 = (lat2,long2)

    try:
        return round(distance.distance(coord1, coord2).miles,4)
    except:
        return 0.0

udf_geo_dist = udf(calculate_geo_distance,DoubleType())

df_tripdata = df_tripdata.withColumn('geo_distance',
udf_geo_dist(col('pickup_latitude'),col('pickup_longitude'),col('dropoff_latitude'),col('dropoff_longitude')))

## Merge tripdata and tripfare

In [35]:
# print(df_tripdata.schema.names)
# print(df_tripfare.schema.names)

['medallion', 'hack_license', 'vendor_id', 'rate_code', 'store_and_fwd_flag', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'trip_time_in_secs', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'pickup_area', 'dropoff_area', 'geo_distance']
['medallion', 'hack_license', 'vendor_id', 'pickup_datetime', 'payment_type', 'fare_amount', 'surcharge', 'mta_tax', 'tip_amount', 'tolls_amount', 'total_amount']


In [36]:
joining_keys = [i for i in df_tripdata.schema.names if i in df_tripfare.schema.names]

df_trip = df_tripdata.join(df_tripfare,joining_keys,'inner')

### add trip_id
w = Window().orderBy(col('pickup_datetime'))
df_trip = df_trip.withColumn('trip_id', F.row_number().over(w))

print((df_tripdata.count(), len(df_tripdata.columns)))
print((df_tripfare.count(), len(df_tripfare.columns)))
print((df_trip.count(), len(df_trip.columns)))

(14734920, 17)
(14971573, 11)
(14623320, 25)


## Create taxi_driver_dim table and save as parquet

In [None]:
### get taxi_id and driver_id
df_taxi_id = df_trip.select('medallion').groupBy('medallion').count().toPandas()\
    .reset_index().rename(columns={'index':'taxi_id'}).drop(columns='count',axis=1)

df_driver_id = df_trip.select('hack_license').groupBy('hack_license').count().toPandas()\
    .reset_index().rename(columns={'index':'driver_id'}).drop(columns='count',axis=1)

In [None]:
### check if there's overlap in medallion and day in which multiple hack_license were driving the same car

## add pickup_date to df_trip
datestrip = udf(lambda x: x[:10],StringType())
df_trip = df_trip.withColumn('pickup_date',datestrip(col('pickup_datetime')))

df_mult_hack_chk = df_trip.select('medallion','hack_license','pickup_date')\
    .groupBy('medallion','pickup_date','hack_license').count()\
        .groupBy('medallion','pickup_date').count().toPandas()

## check multiple hack license

# df_mult_hack_chk.groupby(by='count',as_index=False)[['medallion']].agg({'medallion':'count'})

# Sharing car between 2 or 3 hack license looks genuine.
# no. of hack license in a day	no. of madallions occupied by those hack license in a day
# 1	                            81207
# 2	                            248817
# 3	                            53821
# 4	                            547
# 5	                            4
# 7	                            1
# 14	                        1
# 30	                        1


#### Add taxi_id and driver_id

df_taxi_driver_dim = df_trip.select('medallion','hack_license').groupBy('medallion','hack_license').count().toPandas()\
    .reset_index().rename(columns={'index':'taxi_driver_id'}).drop(columns='count',axis=1)
print(df_taxi_driver_dim.shape)
df_taxi_driver_dim = df_taxi_driver_dim.merge(df_taxi_id,how='left',on='medallion')
print(df_taxi_driver_dim.shape)
df_taxi_driver_dim = df_taxi_driver_dim.merge(df_driver_id,how='left',on='hack_license')
print(df_taxi_driver_dim.shape)

### Save taxi_driver_dim as parquet
df_taxi_driver_dim.to_parquet('spark-warehouse/taxi_driver_dim.parquet')

## Add Taxi Driver ID to df_trip

In [None]:
## Add Taxi Driver ID, Taxi ID, Driver ID to df_trip

taxi_driver_dim_spark_df =spark.createDataFrame(df_taxi_driver_dim) 
joining_keys = ['medallion','hack_license']
df_trip = df_trip.join(taxi_driver_dim_spark_df,joining_keys,'inner')


## Write to Parquet

In [45]:
df_trip.write.format('parquet').saveAsTable('trip_clean_all')