In [1]:
from pyspark.sql import SparkSession

from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType 
from pyspark.sql.types import FloatType
from pyspark.sql.types import TimestampType 
from pyspark.sql.types import DoubleType

import pyspark.sql.functions as F
import multiprocessing

In [2]:
# # Start the Spark Session
# spark = SparkSession.builder.appName('new_york_taxi_fare_prediction').getOrCreate()

cpu_num = multiprocessing.cpu_count()
pool_num = cpu_num - 1

spark = SparkSession.builder\
        .appName('new_york_taxi_fare_prediction')\
        .master('local[' + str(pool_num) + ']')\
        .getOrCreate()


### Load Raw Data

In [24]:
df_yellow = spark.read.csv("../data/raw/yellow_tripdata_20*.csv", header=True)
df_green = spark.read.csv("../data/raw/green_tripdata_20*.csv", header=True)


### Data Cleaning and Merging
1. Add one new column to differentiate the two Taxi Campany - "taxi_campany": Green/Yellow
2. the numbe of Columns are diffrent in the dataset of two Taxi Campanies. So drop the uncommon two columns
3. Rename the column names "lpep_pickup_datetime" and "tpep_pickup_datetime" to "pickup_datetime" to achieve the uniformity
4. Rename the column names "lpep_dropoff_datetime" and "tpep_dropoff_datetime" to "dropoff_datetime" to achieve the uniformity
5. Concantane the dataframes
6. Extract the years/day/hour from the pick-up time
7. Filter out the data only for 2017 and 2018

In [25]:
import pyspark.sql.functions as F

# Add new column to diffientiate the Taxi campany
df_yellow = df_yellow.withColumn('taxi_campany', F.lit("Green"))
df_green = df_green.withColumn('taxi_campany', F.lit("Yellow"))

In [26]:
# Drop the uncommon column
df_green = df_green.drop("ehail_fee", "trip_type")

# Rename the columns tpep_pickup_datetime to pickup_datetime / tpep_dropoff_datetime to dropoff_datetime
df_yellow = df_yellow.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
df_yellow = df_yellow.withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")
# Make the column names lowercase
for col in df_yellow.columns:
    df_yellow = df_yellow.withColumnRenamed(col, col.lower())
    
# Rename the columns lpep_pickup_datetime to pickup_datetime / lpep_dropoff_datetime to dropoff_datetime
df_green = df_green.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
df_green = df_green.withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")
for col in df_green.columns:
    df_green = df_green.withColumnRenamed(col, col.lower())

df = df_green.unionByName(df_yellow)

### Save the Merged Dataset in Parquet Format

In [18]:
df.write.format('parquet').save("../data/processed/df_original_final.parquet", mode='append')
# spark.stop()
##### Started saving at 10:36 PM on 12 April, finished at 10:54
### 5:13 - 5: 29 

### Load the Saved Raw Data in Parquet Format

In [3]:
#### Start loading the parquet file.
df_original = spark.read.parquet("../data/processed/df_original_final.parquet")
### start reading at 11:37AM - finished within 2 sec

In [4]:
df_original.show(2)

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+
|vendorid|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|ratecodeid|pulocationid|dolocationid|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_campany|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+
|       2|2018-11-22 22:37:46|2018-11-22 22:43:06|                 N|         1|          43|         237|              2|         1.26|        6.5|  0.5|    0.5|         0|           0|                  0.3|         7.8|           2|       Green|
|       

In [10]:
######### Batch 1 ########
df_cleaned_2017 = df_original.filter((F.year(F.col('pickup_datetime')) == 2017))
df_cleaned_2017 = df_cleaned_2017.filter(
                    (((F.col('trip_distance'))>0) & ((F.col('trip_distance'))<100)) &
                    ((F.col('mta_tax'))>=0) &
                    (((F.col('fare_amount'))>0) & ((F.col('fare_amount'))<200)) &
                    (((F.col('total_amount'))>0) & ( (F.col('total_amount'))<200)) &
                    (((F.col('passenger_count'))>0) & ((F.col('passenger_count'))<10)))
df_cleaned_2017.write.format('parquet').save('../data/processed/df_original_cleaned_years_dataranges_final.parquet', mode='append')


######### Batch 2 ########
df_cleaned_2018_green = df_original.filter((F.year(F.col('pickup_datetime')) == 2018) & (F.col('taxi_campany') == 'Green'))
df_cleaned_2018_green = df_cleaned_2018_green.filter(
                    (((F.col('trip_distance'))>0) & ((F.col('trip_distance'))<100)) &
                    ((F.col('mta_tax'))>=0) &
                    (((F.col('fare_amount'))>0) & ((F.col('fare_amount'))<200)) &
                    (((F.col('total_amount'))>0) & ( (F.col('total_amount'))<200)) &
                    (((F.col('passenger_count'))>0) & ((F.col('passenger_count'))<10)))
df_cleaned_2018_green.write.format('parquet').save('../data/processed/df_original_cleaned_years_dataranges_final.parquet', mode='append')


######### Batch 3 ########
df_cleaned_2018_yellow = df_original.filter((F.year(F.col('pickup_datetime')) == 2018) & (F.col('taxi_campany') == 'Yellow'))
df_cleaned_2018_yellow = df_cleaned_2018_yellow.filter(
                    (((F.col('trip_distance'))>0) & ((F.col('trip_distance'))<100)) &
                    ((F.col('mta_tax'))>=0) &
                    (((F.col('fare_amount'))>0) & ((F.col('fare_amount'))<200)) &
                    (((F.col('total_amount'))>0) & ( (F.col('total_amount'))<200)) &
                    (((F.col('passenger_count'))>0) & ((F.col('passenger_count'))<10)))
df_cleaned_2018_yellow.write.format('parquet').save('../data/processed/df_original_cleaned_years_dataranges_final.parquet', mode='append')


In [12]:
df_cleaned_2018_yellow.show(2)

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+
|vendorid|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|ratecodeid|pulocationid|dolocationid|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_campany|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+
|       2|2018-06-01 00:33:55|2018-06-01 00:36:13|                 N|         1|          66|          33|              5|          .51|          4|  0.5|    0.5|       0.7|           0|                  0.3|           6|           1|      Yellow|
|       

In [13]:
## The last batch of filter 
df_cleaned_2018_yellow = df_cleaned_2018_yellow.filter(
                    (((F.col('trip_distance'))>0) & ((F.col('trip_distance'))<100)) &
                    ((F.col('mta_tax'))>=0) &
                    (((F.col('fare_amount'))>0) & ((F.col('fare_amount'))<200)) &
                    (((F.col('total_amount'))>0) & ( (F.col('total_amount'))<200)) &
                    (((F.col('passenger_count'))>0) & ((F.col('passenger_count'))<10)))

In [16]:
df_cleaned_2018_yellow.show(2)

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+
|vendorid|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|ratecodeid|pulocationid|dolocationid|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_campany|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+
|       2|2018-06-01 00:40:36|2018-06-01 00:49:46|                 N|         1|          25|          49|              5|         1.97|          9|  0.5|    0.5|      2.06|           0|                  0.3|       12.36|           1|      Yellow|
|       

### Final Cleaned Raw Dataset saved in Parquet format

In [19]:
# Save in three batch due to Mem GC error
df_cleaned_2018_yellow.write.format('parquet').save('../data/processed/df_original_cleaned_years_dataranges_final.parquet', mode='append')

### Load the Saved Data - which is filter out the invalid years and data ranges

The aim in this step is to convert the data types and add new features -
1) weekday
2) pick-up hour
3) trip duration in seconed
4) trip duration ranges


In [20]:
### Load again here - "df_original_cleaned_years_and_ranges" next step
df_cleaned_2year = spark.read.parquet("../data/processed/df_original_cleaned_years_dataranges_final.parquet")

In [22]:
df_cleaned_2year.show(2)

In [24]:
# Extract the hour and weekday from pickup time column
df_cleaned_transformed_1 = df_cleaned_2year.withColumn('pickup_hour', F.hour(F.col('pickup_datetime'))).\
            withColumn('week_day_num', F.date_format(F.col('pickup_datetime'), 'u')).\
            withColumn('week_day_abb', F.date_format(F.col('pickup_datetime'), 'E'))

In [28]:
df_cleaned_transformed_1.show(2)

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+
|vendorid|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|ratecodeid|pulocationid|dolocationid|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_campany|pickup_hour|week_day_num|week_day_abb|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+
|       2|2018-09-12 20:35:43|2018-09-12 21:20:48|                 N|         1|         237|          13|              1|         6.10|      

In [29]:
## Convert the Data Types to Numerical 
df_cleaned_transformed_2 = df_cleaned_transformed_1.withColumn('passenger_count', F.col('passenger_count').astype(IntegerType())).\
              withColumn('week_day_num', F.col('week_day_num').astype(IntegerType())).\
              withColumn('trip_distance', F.col('trip_distance').astype(FloatType())).\
              withColumn('total_amount', F.col('total_amount').astype(FloatType())).\
              withColumn('tolls_amount', F.col('tolls_amount').astype(FloatType())).\
              withColumn('fare_amount', F.col('fare_amount').astype(FloatType())).\
              withColumn('mta_tax', F.col('mta_tax').astype(FloatType()))

In [31]:
# Create UDF for Trip Duration
def time_delta(y,x): 
    from datetime import datetime
    end = datetime.strptime(y, '%Y-%m-%d %H:%M:%S')
    start = datetime.strptime(x, '%Y-%m-%d %H:%M:%S')
    timedelta = int((end-start).total_seconds())
    return timedelta
    

from pyspark.sql.functions import struct
udf_calcluate_duration = F.udf(lambda x: time_delta(x[1],x[0]), IntegerType())

df_cleaned_transformed_3 = df_cleaned_transformed_2.withColumn('trip_duration_sec', udf_calcluate_duration(struct('pickup_datetime', 'dropoff_datetime')))


In [32]:
df_cleaned_transformed_3.show(2)

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+-----------------+
|vendorid|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|ratecodeid|pulocationid|dolocationid|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_campany|pickup_hour|week_day_num|week_day_abb|trip_duration_sec|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+-----------------+
|       2|2018-09-12 20:35:43|2018-09-12 21:20:48|                 N|         1|        

In [36]:
# Create UDF for Speed Km-per-hour
def cal_speed(dist, duration): 
    duration = duration/3600
    if(duration <= 0):
       return 0
    # Convert Mile to Km
    dist = dist/0.621371
    
    speed = round(dist/duration, 2)
    return speed
    
from pyspark.sql.functions import struct
udf_calcluate_speed = F.udf(lambda x: cal_speed(x[0],x[1]), FloatType())

df_cleaned_transformed_4 = df_cleaned_transformed_3.withColumn('speed_km_hr', udf_calcluate_speed(struct('trip_distance', 'trip_duration_sec')))


In [37]:
df_cleaned_transformed_4.show(2)

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+-----------------+-----------+
|vendorid|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|ratecodeid|pulocationid|dolocationid|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_campany|pickup_hour|week_day_num|week_day_abb|trip_duration_sec|speed_km_hr|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+-----------------+-----------+
|       2|2018-09-12 20:35:43|2018-09-12 21:20:48|  

In [38]:
# Create UDF for binning Trip during into <5, 5-10, 10-20, 20-30, >30 in minutes
## The func takes the pre-calculated trip duration in minutes
def bin_trip_duration_min(x): 
    timedelta = x/60   
    if(timedelta < 5):
        range_string = "<5 mins"
    elif( (timedelta >= 5) & (timedelta <10)):
        range_string = "5-10 mins"
    elif( (timedelta >= 10) & (timedelta <20)):
        range_string = "10-20 mins"
    elif( (timedelta >= 20) & (timedelta <30)):
        range_string = "20-30 mins"
    else: range_string = ">30 mins"       
    return range_string

udf_bin_trip_duration_min = F.udf(lambda x: bin_trip_duration_min(x), StringType())
df_cleaned_transformed_5 = df_cleaned_transformed_4.withColumn('trip_duration_range_mins', udf_bin_trip_duration_min(F.col("trip_duration_sec")))


In [39]:
df_cleaned_transformed_5.show(2)

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+-----------------+-----------+------------------------+
|vendorid|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|ratecodeid|pulocationid|dolocationid|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_campany|pickup_hour|week_day_num|week_day_abb|trip_duration_sec|speed_km_hr|trip_duration_range_mins|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+-----------------+-----------+---

In [41]:
df_cleaned_transformed_6 = df_cleaned_transformed_5.filter(
                    (((F.col('speed_km_hr'))>0) & ((F.col('speed_km_hr'))< 100))  &     
                    (((F.col('trip_duration_sec'))>0) & ((F.col('trip_duration_sec')) < 2000)) )

In [42]:
df_cleaned_transformed_6.show(2)

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+-----------------+-----------+------------------------+
|vendorid|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|ratecodeid|pulocationid|dolocationid|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_campany|pickup_hour|week_day_num|week_day_abb|trip_duration_sec|speed_km_hr|trip_duration_range_mins|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+-----------------+-----------+---

In [43]:
### Save the cleaned version of all year with col + new column
df_cleaned_transformed_6.write.format('parquet').save('../data/processed/df_transformed_2yr.parquet', mode='append')
#6:12 am - failed at 6:35  
# 2:25 - 10: 19PM , completed with Success

#### Load the saved Cleaned data with alll columns to do next step - filtering

In [3]:
df_cleaned_super = spark.read.parquet("../data/processed/df_transformed_2yr_final.parquet")

In [4]:
### This cleaned dataset is ready to use for SQL Business Queries from here
df_cleaned_super.show(2)

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+-----------------+-----------+------------------------+
|vendorid|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|ratecodeid|pulocationid|dolocationid|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_campany|pickup_hour|week_day_num|week_day_abb|trip_duration_sec|speed_km_hr|trip_duration_range_mins|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+-----------------+-----------+---

### Build Pipeline

In [5]:
# Extract Year and month for extraction in later step
df_cleaned_super = df_cleaned_super.withColumn('year', F.year(F.col('pickup_datetime'))).\
                                    withColumn('month', F.month(F.col('pickup_datetime')))

In [6]:
df_cleaned_super.show(2)

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+-----------------+-----------+------------------------+----+-----+
|vendorid|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|ratecodeid|pulocationid|dolocationid|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|taxi_campany|pickup_hour|week_day_num|week_day_abb|trip_duration_sec|speed_km_hr|trip_duration_range_mins|year|month|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+------------+-----------+------------+------------+-----------

In [7]:
# Select the features interested: for Modelling
cols_list = ['year', 'month','passenger_count','store_and_fwd_flag', 'pulocationid', 'trip_distance', 'trip_duration_sec', 'total_amount', 'pickup_hour',
             'ratecodeid', 'tolls_amount', 'taxi_campany','week_day_abb','week_day_num', 'trip_duration_range_mins', 'speed_km_hr']

# Subset the columns
df_super_cleaned_model = df_cleaned_super.select(cols_list)

In [8]:
df_super_cleaned_model.show(5)

+----+-----+---------------+------------------+------------+-------------+-----------------+------------+-----------+----------+------------+------------+------------+------------+------------------------+-----------+
|year|month|passenger_count|store_and_fwd_flag|pulocationid|trip_distance|trip_duration_sec|total_amount|pickup_hour|ratecodeid|tolls_amount|taxi_campany|week_day_abb|week_day_num|trip_duration_range_mins|speed_km_hr|
+----+-----+---------------+------------------+------------+-------------+-----------------+------------+-----------+----------+------------+------------+------------+------------+------------------------+-----------+
|2017|    9|              1|                 N|         234|         1.15|              429|        8.76|         15|         1|         0.0|       Green|         Sat|           6|               5-10 mins|      15.53|
|2017|    9|              1|                 N|         233|          4.0|             1706|        21.8|         15|         1|

### Pipeline Building Performed in Google Colab

In [9]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoderEstimator
from pyspark.ml import Pipeline

stages = []

# Categorical Variable Encoding
cat_cols = ['ratecodeid', 'taxi_campany', 'trip_duration_range_mins', 'store_and_fwd_flag', 'pulocationid', 'week_day_num']
for cat_col in cat_cols:
    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind")
    col_encoder = OneHotEncoderEstimator(inputCols=[f"{cat_col}_ind"], outputCols=[f"{cat_col}_ohe"])
    stages += [col_indexer, col_encoder]
    
cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]

In [10]:
# Numerical Variables in the model
num_cols = ['passenger_count', 'trip_distance', 'pickup_hour', 'total_amount', 'year', 'month']

In [11]:
## Vectorise the encoded categorical columns and numeric columns to inform the pipeline
assembler = VectorAssembler(inputCols=cat_cols_ohe + num_cols, outputCol="features")
assembler.setHandleInvalid("keep")

stages += [assembler]

## Construct the pipeline
pipeline = Pipeline(stages=stages)

In [12]:
### Supply the Data to pipeline
pipeline_model = pipeline.fit(df_super_cleaned_model)
### 6:25 - 6:28, 6:37 - 6:40

In [13]:
## Trigger transformation pipeline wtih  3-months of data 
pipelined_super_cleaned_2yr_df = pipeline_model.transform(df_super_cleaned_model)
#7:17 - 7:18 ## 7:32 - 7:32

In [14]:
pipelined_super_cleaned_2yr_df.show(2)

+----+-----+---------------+------------------+------------+-------------+-----------------+------------+-----------+----------+------------+------------+------------+------------+------------------------+-----------+--------------+--------------+----------------+----------------+----------------------------+----------------------------+----------------------+----------------------+----------------+----------------+----------------+----------------+--------------------+
|year|month|passenger_count|store_and_fwd_flag|pulocationid|trip_distance|trip_duration_sec|total_amount|pickup_hour|ratecodeid|tolls_amount|taxi_campany|week_day_abb|week_day_num|trip_duration_range_mins|speed_km_hr|ratecodeid_ind|ratecodeid_ohe|taxi_campany_ind|taxi_campany_ohe|trip_duration_range_mins_ind|trip_duration_range_mins_ohe|store_and_fwd_flag_ind|store_and_fwd_flag_ohe|pulocationid_ind|pulocationid_ohe|week_day_num_ind|week_day_num_ohe|            features|
+----+-----+---------------+------------------+---

In [15]:
### Get the Data ready in the format of the vectorised features plus  original feature
pipelined_cleaned_2yr_final_df = pipelined_super_cleaned_2yr_df.select(['features'] + cols_list)
#6:31 - 6:31

In [16]:
### See the pipeline data output
pipelined_cleaned_2yr_final_df.show(2)

+--------------------+----+-----+---------------+------------------+------------+-------------+-----------------+------------+-----------+----------+------------+------------+------------+------------+------------------------+-----------+
|            features|year|month|passenger_count|store_and_fwd_flag|pulocationid|trip_distance|trip_duration_sec|total_amount|pickup_hour|ratecodeid|tolls_amount|taxi_campany|week_day_abb|week_day_num|trip_duration_range_mins|speed_km_hr|
+--------------------+----+-----+---------------+------------------+------------+-------------+-----------------+------------+-----------+----------+------------+------------+------------+------------+------------------------+-----------+
|(288,[0,6,8,11,18...|2017|    9|              1|                 N|         234|         1.15|              429|        8.76|         15|         1|         0.0|       Green|         Sat|           6|               5-10 mins|      15.53|
|(288,[0,6,9,11,47...|2017|    9|           

### Split Data Performed in Google Colab

In [4]:
## Split the data of last three months for testing

test_data = pipelined_cleaned_2yr_final_df.filter((F.col('year') == 2018) & (((F.col('month') == 10) |(F.col('month') == 11) |(F.col('month') == 12)) ))

In [None]:
## Split the data of first 21 months for testing
train_data = pipelined_cleaned_2yr_final_df.filter(((F.col('year') == 2017) & ((F.col('month') >= 1) & (F.col('month') <= 12)))|
                                                 ((F.col('year') == 2018) & ((F.col('month') >= 1) & (F.col('month') <= 9))))

In [5]:
train_data.show(2) 

+--------------------+---------------+-------------+------------+-----------+------------+----------+------------+------------+------------------------+------------------+------------+
|            features|passenger_count|trip_distance|total_amount|pickup_hour|week_day_num|ratecodeid|pulocationid|taxi_campany|trip_duration_range_mins|store_and_fwd_flag|tolls_amount|
+--------------------+---------------+-------------+------------+-----------+------------+----------+------------+------------+------------------------+------------------+------------+
|(284,[0,6,7,11,12...|              1|         0.46|        10.3|         17|           5|         1|         237|       Green|              10-20 mins|                 N|         0.0|
|(284,[0,6,7,11,12...|              1|          0.5|         8.3|         11|           5|         1|         237|       Green|              10-20 mins|                 N|         0.0|
+--------------------+---------------+-------------+------------+----------

### Model Training Done in Google Colab

In [6]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features', labelCol='total_amount', maxIter = 10, regParam=0.3, elasticNetParam=0.8)

In [15]:
## Model Training Done on  Google Colab
lr_model = lr.fit(train_data) 

In [16]:
summ_new = lr_model.summary
summ_new.rootMeanSquaredError

In [None]:
lr_predictions = lr_model.transform(train_data)
lr_predictions.select("prediction","total_amount","features").show(20)

In [18]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="total_amount", metricName="rmse")
rmse = lr_evaluator.evaluate(lr_predictions)
print("Root Mean Squared Error (RMSE) on Train data = %g" % rmse)

In [None]:
lr_predictions_test = lr_model.transform(test_data)
lr_predictions_test.select("prediction","total_amount","features").show(20)

### Spark SQL - Python - Business Questions

In [4]:
df_cleaned_super = spark.read.parquet("../data/processed/df_transformed_2yr_final.parquet")

In [5]:
df_cleaned_super.createOrReplaceTempView("nyc_view")

In [76]:
# Question 4.a.i - What is the total number of trip for each month
spark.sql('''
SELECT YEAR(pickup_datetime) year,
       MONTH(pickup_datetime) month,
       COUNT (1) number_of_trips
FROM nyc_view
GROUP BY YEAR(pickup_datetime), MONTH(pickup_datetime)
ORDER BY 1,2
''').show()

+----+-----+---------------+
|year|month|number_of_trips|
+----+-----+---------------+
|2017|    1|       10780387|
|2017|    2|       10192088|
|2017|    3|       11453266|
|2017|    4|       11127959|
|2017|    5|       11161587|
|2017|    6|       10633460|
|2017|    7|        9503269|
|2017|    8|        9289554|
|2017|    9|        9827956|
|2017|   10|       10694462|
|2017|   11|       10158994|
|2017|   12|       10414465|
|2018|    1|        9553631|
|2018|    2|        9262273|
|2018|    3|       10266415|
|2018|    4|       10105415|
|2018|    5|       10021377|
|2018|    6|        9453060|
|2018|    7|        8534027|
|2018|    8|        8515365|
+----+-----+---------------+
only showing top 20 rows



In [77]:
# Question 4.a.ii - Which weekday had the most trip
spark.sql('''
WITH trip_rank AS(
SELECT *,
        RANK() OVER(PARTITION BY year,month ORDER BY number_of_trips DESC) rank
FROM (
        SELECT 
            YEAR(pickup_datetime) year, 
            MONTH(pickup_datetime) month,
            week_day_abb weekday,
            Count(*)  number_of_trips

        FROM nyc_view
        GROUP BY YEAR(pickup_datetime), MONTH(pickup_datetime), week_day_abb      
        ORDER BY 1,2,3
)
)
SELECT * FROM trip_rank
WHERE rank = 7
ORDER BY year,month
''').show()

+----+-----+-------+---------------+----+
|year|month|weekday|number_of_trips|rank|
+----+-----+-------+---------------+----+
|2017|    1|    Wed|        1385000|   7|
|2017|    2|    Mon|        1304355|   7|
|2017|    3|    Tue|        1176901|   7|
|2017|    4|    Mon|        1300638|   7|
|2017|    5|    Sun|        1317906|   7|
|2017|    6|    Sun|        1234313|   7|
|2017|    7|    Tue|        1188752|   7|
|2017|    8|    Sun|        1058619|   7|
|2017|    9|    Mon|        1149302|   7|
|2017|   10|    Wed|        1406840|   7|
|2017|   11|    Sun|        1253896|   7|
|2017|   12|    Mon|        1159131|   7|
|2018|    1|    Sun|        1136214|   7|
|2018|    2|    Sun|        1148060|   7|
|2018|    3|    Wed|        1151506|   7|
|2018|    4|    Tue|        1346255|   7|
|2018|    5|    Sun|        1125833|   7|
|2018|    6|    Sun|        1099915|   7|
|2018|    7|    Sat|        1076810|   7|
|2018|    8|    Sun|         936170|   7|
+----+-----+-------+--------------

In [79]:
# Question 4.a.iii Which hour of the day had the most trip
spark.sql('''
WITH trip_rank AS(
SELECT *,
        RANK() OVER(PARTITION BY year,month ORDER BY total_number_of_trips DESC) rank
FROM (
        SELECT 
            YEAR(pickup_datetime) year, 
            MONTH(pickup_datetime) month,
            Hour(pickup_datetime) hour,
            Count(*) total_number_of_trips

        FROM nyc_view
        GROUP BY YEAR(pickup_datetime), MONTH(pickup_datetime), hour(pickup_datetime)  
        ORDER BY 1,2,3
)
)
SELECT * FROM trip_rank
WHERE rank = 1
ORDER BY year,month
''').show()

+----+-----+----+---------------------+----+
|year|month|hour|total_number_of_trips|rank|
+----+-----+----+---------------------+----+
|2017|    1|  18|               692726|   1|
|2017|    2|  18|               673287|   1|
|2017|    3|  19|               735218|   1|
|2017|    4|  18|               697420|   1|
|2017|    5|  18|               690046|   1|
|2017|    6|  18|               656818|   1|
|2017|    7|  18|               589087|   1|
|2017|    8|  18|               588260|   1|
|2017|    9|  19|               622207|   1|
|2017|   10|  18|               684103|   1|
|2017|   11|  18|               650704|   1|
|2017|   12|  18|               647202|   1|
|2018|    1|  18|               632957|   1|
|2018|    2|  18|               614335|   1|
|2018|    3|  18|               667507|   1|
|2018|    4|  18|               661268|   1|
|2018|    5|  18|               636948|   1|
|2018|    6|  18|               591743|   1|
|2018|    7|  18|               547694|   1|
|2018|    

In [6]:
# Question 4.a.iv What was the average number of passenger?
### Avg passenger = total Passenger / total number of Trip
spark.sql('''
SELECT 
    YEAR(pickup_datetime) year, 
    MONTH(pickup_datetime) month,
    SUM(passenger_count) total_passenger_count,
    COUNT(*) total_trip_count,
    FLOOR((SUM(passenger_count)/COUNT(*))) avg_passenger_count
FROM nyc_view
GROUP BY YEAR(pickup_datetime), MONTH(pickup_datetime)
ORDER By 1,2
''').show()

+----+-----+---------------------+----------------+-------------------+
|year|month|total_passenger_count|total_trip_count|avg_passenger_count|
+----+-----+---------------------+----------------+-------------------+
|2017|    1|             11977257|         7454214|                  1|
|2017|    2|             11373058|         7095996|                  1|
|2017|    3|             12262729|         7692202|                  1|
|2017|    4|             12287367|         7659237|                  1|
|2017|    5|             12054726|         7545181|                  1|
|2017|    6|             11595562|         7239610|                  1|
|2017|    7|             10668300|         6591228|                  1|
|2017|    8|             10402774|         6453155|                  1|
|2017|    9|             10743991|         6676683|                  1|
|2017|   10|             11644272|         7258643|                  1|
|2017|   11|             10875274|         6790598|             

In [10]:
# Question 4.a.v What was the average amount paid per trip?
spark.sql('''
SELECT 
    YEAR(pickup_datetime) year, 
    MONTH(pickup_datetime) month,
    ROUND((SUM(total_amount)/COUNT(*)), 2) avg_amount_per_trip
FROM nyc_view
GROUP BY YEAR(pickup_datetime), MONTH(pickup_datetime)
ORDER BY 1,2
''').show(24)
#9:25 - 9:26 when using the saved cleand data in PQ format

+----+-----+-------------------+
|year|month|avg_amount_per_trip|
+----+-----+-------------------+
|2017|    1|              15.71|
|2017|    2|              15.74|
|2017|    3|              15.87|
|2017|    4|              15.83|
|2017|    5|              15.82|
|2017|    6|              15.73|
|2017|    7|              15.97|
|2017|    8|              15.95|
|2017|    9|              15.95|
|2017|   10|              15.96|
|2017|   11|              15.91|
|2017|   12|              15.86|
|2018|    1|               15.6|
|2018|    2|               15.6|
|2018|    3|              15.59|
|2018|    4|              15.79|
|2018|    5|              15.88|
|2018|    6|              15.82|
|2018|    7|              15.92|
|2018|    8|              15.94|
+----+-----+-------------------+
only showing top 20 rows



In [11]:
# Question 4.a.vi What was the average amount paid per passenger?
### http://spark.apache.org/docs/latest/sql-ref-functions-builtin.html
spark.sql('''
SELECT 
    YEAR(pickup_datetime) year, 
    MONTH(pickup_datetime) month,
    ROUND((SUM(total_amount)/SUM(passenger_count)),2) avg_amount_paid_by_passenger
FROM nyc_view
GROUP BY YEAR(pickup_datetime), MONTH(pickup_datetime)
ORDER BY 1,2
''').show(24)
#9:27 - 9:28

+----+-----+----------------------------+
|year|month|avg_amount_paid_by_passenger|
+----+-----+----------------------------+
|2017|    1|                        9.78|
|2017|    2|                        9.82|
|2017|    3|                        9.95|
|2017|    4|                        9.87|
|2017|    5|                         9.9|
|2017|    6|                        9.82|
|2017|    7|                        9.87|
|2017|    8|                         9.9|
|2017|    9|                        9.91|
|2017|   10|                        9.95|
|2017|   11|                        9.94|
|2017|   12|                        9.78|
|2018|    1|                       10.05|
|2018|    2|                       10.11|
|2018|    3|                       10.07|
|2018|    4|                       10.18|
|2018|    5|                       10.26|
|2018|    6|                       10.21|
|2018|    7|                       10.19|
|2018|    8|                       10.28|
|2018|    9|                      

In [32]:
# Question 4.b.i For Each Taxi Color 
## What was the average, median, mini and max trip_duration in seconds

spark.sql('''
SELECT 
    taxi_campany,
    ROUND(AVG(trip_duration_sec),2) avg_trip_duation_sec,
    ROUND(MIN(trip_duration_sec),2) min_trip_duration_sec,
    ROUND(MAX(trip_duration_sec),2) max_trip_duration_sec,
    ROUND(APPROX_PERCENTILE(trip_duration_sec, 0.5),2) median_trip_duration_sec
FROM nyc_view
GROUP BY taxi_campany
''').show()

+------------+--------------------+---------------------+---------------------+------------------------+
|taxi_campany|avg_trip_duation_sec|min_trip_duration_sec|max_trip_duration_sec|median_trip_duration_sec|
+------------+--------------------+---------------------+---------------------+------------------------+
|       Green|              727.62|                    1|                 1999|                     630|
|      Yellow|              697.76|                    1|                 1999|                     593|
+------------+--------------------+---------------------+---------------------+------------------------+



In [14]:
#### Q-4.b.ii For Each Taxi Color What was the average, median, mini and max trip distance in Km
spark.sql('''
SELECT 
    taxi_campany,
    ROUND((AVG(trip_distance)/0.621371),2) avg_trip_dist_km,
    ROUND((MIN(trip_distance)/0.621371),2) min_trip_dist_km,
    ROUND((MAX(trip_distance)/0.621371),2) max_trip_dist_km,
    ROUND((APPROX_PERCENTILE(trip_distance, 0.5)),2) median_trip_dist_km
FROM nyc_view
GROUP BY taxi_campany
''').show()
# 9:30 - 9:31

+------------+----------------+----------------+----------------+-------------------+
|taxi_campany|avg_trip_dist_km|min_trip_dist_km|max_trip_dist_km|median_trip_dist_km|
+------------+----------------+----------------+----------------+-------------------+
|       Green|            4.65|            1.61|           55.31|                2.0|
|      Yellow|            4.89|            1.61|           52.95|               2.28|
+------------+----------------+----------------+----------------+-------------------+



In [19]:
# #### Q-4.b.iii For Each Taxi Color What was the average, median, mini and max speed in Km per hour
# speed_km_hr
spark.sql('''
SELECT 
    taxi_campany,
    ROUND(AVG(speed_km_hr),2) avg_speed_km_hr,
    ROUND(MIN(speed_km_hr),2) min_speed_km_hr,
    ROUND(MAX(speed_km_hr),2) max_speed_km_hr,
    APPROX_PERCENTILE(speed_km_hr, 0.5) median_speed_km_hr
FROM nyc_view
GROUP BY taxi_campany
''').show()
#9:35 - 9:36

+------------+---------------+---------------+---------------+------------------+
|taxi_campany|avg_speed_km_hr|min_speed_km_hr|max_speed_km_hr|median_speed_km_hr|
+------------+---------------+---------------+---------------+------------------+
|       Green|          18.99|            2.9|          99.99|              16.7|
|      Yellow|           20.6|           2.91|           99.9|             18.51|
+------------+---------------+---------------+---------------+------------------+



In [21]:
#### Q-4.c What was the percentage of trips where the driver received tips?
spark.sql('''
SET tipped_trip_count = (SELECT COUNT(*) tipped_trip_count
FROM nyc_view
WHERE tip_amount > 0)                           
''')

spark.sql('''
SELECT ROUND((${tipped_trip_count}/COUNT(*)) * 100, 2) trip_percent_with_tips
FROM nyc_view
''').show()
# 9:37 -

+----------------------+
|trip_percent_with_tips|
+----------------------+
|                 61.83|
+----------------------+



In [43]:
#### Q-4.d What was the percentage of trips where the driver received tips >= 10$ ?
# https://www.javaer101.com/en/article/41891788.html ref for variable
spark.sql('''
SET high_tip_trip_count = (SELECT COUNT(*) high_tip_trip_count
FROM nyc_view
WHERE tip_amount >= 10)                           
''')

spark.sql('''
SELECT ROUND((${high_tip_trip_count}/COUNT(*)) * 100, 2) trip_percent_with_morethan_10dollars_tips
FROM nyc_view
''').show()


+-----------------------------------------+
|trip_percent_with_morethan_10dollars_tips|
+-----------------------------------------+
|                                     2.26|
+-----------------------------------------+



In [6]:
#### Q.4.e.i For each bin of trip duration - what is the average speed (Km per hour) - speed_km_hr
spark.sql('''
SELECT trip_duration_range_mins, ROUND(SUM(speed_km_hr)/ COUNT(*),2) average_speed_km_per_hr
FROM nyc_view
GROUP BY trip_duration_range_mins
''').show()

+------------------------+-----------------------+
|trip_duration_range_mins|average_speed_km_per_hr|
+------------------------+-----------------------+
|              10-20 mins|                  17.91|
|               5-10 mins|                  19.27|
|              20-30 mins|                  20.72|
|                 <5 mins|                  28.16|
|                >30 mins|                  23.51|
+------------------------+-----------------------+



In [5]:
#### Q.4.e.ii For each bin of trip duration - what is the average distance (dist per dollar)
spark.sql('''
SELECT trip_duration_range_mins,
       ROUND((SUM(trip_distance)/0.621371),2) total_distance,
       ROUND(SUM(total_amount), 2) sum_fare_amount,
       ROUND(((SUM(trip_distance)/0.621371) /  SUM(total_amount)), 2) avg_dist_km_per_dollar
       
FROM nyc_view
GROUP BY trip_duration_range_mins       
''').show()

+------------------------+--------------+---------------+----------------------+
|trip_duration_range_mins|total_distance|sum_fare_amount|avg_dist_km_per_dollar|
+------------------------+--------------+---------------+----------------------+
|              10-20 mins|3.6325974517E8|1.27872238223E9|                  0.28|
|               5-10 mins|1.2740173054E8| 5.1183941346E8|                  0.25|
|              20-30 mins|2.7642736134E8|  8.351790931E8|                  0.33|
|                 <5 mins|  1.01388973E7|  4.116772026E7|                  0.25|
|                >30 mins| 6.819661047E7| 1.9123135426E8|                  0.36|
+------------------------+--------------+---------------+----------------------+



##### Q.4.f Which duration bin will you advise a taxi driver to target to maximise his income?

The taxi drivers are suggested to target the passenger pick up for short distance which would take about 5 minutes or within 5 to 10 minutes to maximise their income. 

In [None]:
spark.stop()