# Purpose


This notebook demonstrates the data pipeline from raw tables to analytical datasets. At the end of this activity, train & test data sets are created from raw data.

# Imports

In [5]:
# Standard Library Imports
import sys
import time
import os.path as op
import os
import re
import random
import time
import warnings

# Third Party Imports
import yaml
import hvplot
import panel as pn
import pandas as pd
import numpy as np
import holoviews as hv

from pyspark_dist_explore import (
    Histogram,
    hist,
    distplot,
    pandas_histogram
)

from IPython.display import (
    display,
    display_html
)

# Spark Imports
from pyspark.sql import (
    types as DT,
    functions as F,
    Window
)
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import (
    ParamGridBuilder,
    CrossValidator,
    CrossValidatorModel
)
from pyspark.ml.feature import (
    VectorAssembler,
    StandardScaler,
    StringIndexer,
    OneHotEncoder,
    Imputer
)
from pyspark.mllib.evaluation import RegressionMetrics

# Project Imports
from ta_lib.pyspark import (
    dp,
    features,
    eda,
)
# Project Imports
from ta_lib.pyspark.core import (
    utils,
    context
)
# options
random_seed = 0
pn.extension('bokeh')
warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None)



# Initialization

`config.yml` is used to store all the parameters required for the template

In [6]:
config_path = op.join(os.getcwd(),'conf', 'config.yml')
with open(config_path, 'r') as fp:
    config = yaml.safe_load(fp)
config

{'all': {'core': 'default',
  'log_catalog': 'production',
  'data_catalog': 'remote',
  'job_catalog': 'local'},
 'spark': {'spark.executer.cores': 4, 'spark.cores.max': 4}}

In [7]:
data_config_path = op.join(os.getcwd(),'conf/data_catalog', 'local.yml')
with open(data_config_path, 'r') as fp:
    data_config = yaml.safe_load(fp)
data_config

{'reference_date': datetime.date(2020, 8, 31),
 'num_days_prediction': 7,
 'raw': {'filesystem': 'file',
  'base_path': './../../data/raw/',
  'carrier_data_path': 'carrier_data.csv',
  'fuel_prices_data_path': 'fuel_prices.csv',
  'market_carrier_rates_data_path': 'market_carrier_rates_data.csv',
  'route_mapping_data_path': 'route_mapping.csv'},
 'clean': {'filesystem': 'file',
  'base_path': './../../data/cleaned/',
  'carrier_data_path': 'carrier_data',
  'fuel_prices_data_path': 'fuel_prices',
  'market_carrier_rates_data_path': 'market_carrier_rates',
  'final_routes_data_path': 'final_data',
  'trasnformed_routes_data_path': 'transformed_data'},
 'processed': {'filesystem': 'file',
  'base_path': './../../data/processed/',
  'train': 'train_carrier',
  'test': 'test_carrier',
  'preds': 'predictions_carrier'},
 'spark': {'spark.executer.cores': 4, 'spark.cores.max': 4}}

## Create spark session

`talib.pyspark.context` module is leveraged to build the sparksession so as to consider the spark session related params in the config file while building the session.

In [8]:
%%time
session = context.CustomSparkSession(config)
session.CreateSparkSession()
spark = session.spark
sc = session.sc.setLogLevel("ERROR")

CPU times: user 20.9 ms, sys: 20.9 ms, total: 41.8 ms
Wall time: 74.2 ms


# Data Read

In [9]:
%%time
carrier_df = utils.read_data(
    spark=spark,
    paths=[data_config['raw']['base_path'] + data_config['raw']['carrier_data_path']],
    fs=data_config['raw']['filesystem'],
    fmt="csv",
    header="true",
    inferschema="true"
)
fuelprices_df = utils.read_data(
    spark=spark,
    paths=[data_config['raw']['base_path'] + data_config['raw']['fuel_prices_data_path']],
    fs=data_config['raw']['filesystem'],
    fmt="csv",
    header="true",
    inferschema="true"
)
market_carrier_rates_df = utils.read_data(
    spark=spark,
    paths=[data_config['raw']['base_path'] + data_config['raw']['market_carrier_rates_data_path']],
    fs=data_config['raw']['filesystem'],
    fmt="csv",
    header="true",
    inferschema="true"
)
route_mapping_df = utils.read_data(
    spark=spark,
    paths=[data_config['raw']['base_path'] + data_config['raw']['route_mapping_data_path']],
    fs=data_config['raw']['filesystem'],
    fmt="csv",
    header="true",
    inferschema="true"
)

[Stage 7:>                                                          (0 + 1) / 1]

CPU times: user 366 ms, sys: 94.1 ms, total: 460 ms
Wall time: 1min 26s


                                                                                

# Data Processing

The focus here is to create a cleaned dataset that is appropriate for solving the DS problem at hand from the raw data.

**Do's**

* Clean dataframe column names
* Ensure dtypes are set properly
* Join with other tables etc to create features
* Transform, if appropriate, datetime like columns to generate additional features (weekday etc)
* Discard cols that are not useful for training the model (IDs, constant cols, duplicate cols etc) additional features generated from existing columns

**Dont's**

* Handle missing values or outliers here. Mark them and leave them for processing downstream.

### Reference Dates

`reference_date` - date of reference for consideration for historical data.<br>

In [10]:
reference_date = pd.to_datetime(data_config['reference_date'])
print(f"Reference date: {reference_date}")

Reference date: 2020-08-31 00:00:00


#### Carrier data 

In [11]:
# # Filter carrier data dataframe

carrier_df = carrier_df.withColumn(
        "pickup_date",
        F.to_date(
            F.unix_timestamp(F.col("pickup_date"), "dd/MM/yyyy").cast("timestamp")
        ),
    ).filter(F.col("pickup_date") <= reference_date)

# Save the dataset
utils.save_data(carrier_df, path=data_config["clean"]["base_path"]
        + data_config["clean"]["carrier_data_path"])
carrier_df.limit(3).toPandas()

                                                                                

Unnamed: 0,trip_id,distance,vehicle_type,pickup_date,origin_city,origin_state,origin_zip,origin_country,destination_city,destination_state,destination_zip,destination_country,carrier_price
0,23099,626.54,1,2019-01-17,BURLINGTAN,WI,53105,USA,MEMPYIS,TN,38141,USA,1527.6
1,23105,2253.5,1,2019-01-18,MCCERREN,NV,89434,USA,YEBRAN,AY,43025,USA,3855.1
2,23107,473.62,1,2019-01-21,BURLINGTAN,WI,53105,USA,YEBRAN,AY,43025,USA,1297.25


For every start and end zip code in the carrier_data, let's add the corresponding `market_id` as `origin_market_id` and `destination_market_id`, respectively. Drop unnecessaryy columns.

In [12]:
carrier_df = carrier_df.join(route_mapping_df.select(['zipcode', 'market_id'])
                                       .withColumnRenamed("market_id", "origin_market_id"),
                                       carrier_df.origin_zip == route_mapping_df.zipcode,
                                      how='left').drop('zipcode')
carrier_df = carrier_df.join(route_mapping_df.select(['zipcode', 'market_id'])
                                       .withColumnRenamed("market_id", "destination_market_id"),
                                       carrier_df.destination_zip == route_mapping_df.zipcode,
                                      how='left').drop('zipcode')

carrier_df = carrier_df.where((F.col('origin_market_id').isNotNull())
                                     & (F.col('destination_market_id').isNotNull()))

droplist = ['origin_city', 'origin_state', 'origin_country', 'destination_city', 
            'destination_state', 'destination_country']
carrier_df = carrier_df.drop(*droplist)
carrier_df.limit(3).toPandas()

                                                                                

Unnamed: 0,trip_id,distance,vehicle_type,pickup_date,origin_zip,destination_zip,carrier_price,origin_market_id,destination_market_id
0,23099,626.54,1,2019-01-17,53105,38141,1527.6,WI_MIL,TN_MEM
1,23105,2253.5,1,2019-01-18,89434,43025,3855.1,NV_REN,OH_COL
2,23107,473.62,1,2019-01-21,53105,43025,1297.25,WI_MIL,OH_COL


Create a column for first Monday of the week in order to join with the weekly fuel prices data 

In [13]:
carrier_df = carrier_df.withColumn(
        "day_of_week", F.dayofweek(F.col("pickup_date")) - 2
    ).withColumn("first_monday_of_week", F.expr("date_sub(pickup_date, day_of_week)"))
carrier_df.limit(3).toPandas()

                                                                                

Unnamed: 0,trip_id,distance,vehicle_type,pickup_date,origin_zip,destination_zip,carrier_price,origin_market_id,destination_market_id,day_of_week,first_monday_of_week
0,23099,626.54,1,2019-01-17,53105,38141,1527.6,WI_MIL,TN_MEM,3,2019-01-14
1,23105,2253.5,1,2019-01-18,89434,43025,3855.1,NV_REN,OH_COL,4,2019-01-14
2,23107,473.62,1,2019-01-21,53105,43025,1297.25,WI_MIL,OH_COL,0,2019-01-21


#### Market carrier rates data
The market carrier rates data contains data collected over various carrier providers in the market, with details of total cost and total distance over all providers on a given route for a given date.

We need to aggregate the information upto a weekly level so that we report the prices on the first Monday of each week for each vehicle type on a route.

Then we calculate the market Rate per Mile (RPM) averaged over all carrier providers as $\frac{\text{Sum of cost over all providers}}{\text{Sum of dist over all providers}}$

In [14]:
market_carrier_rates_df = (
    market_carrier_rates_df.withColumn(
        "week_ending_date",
        F.to_date(
            F.unix_timestamp(F.col("week_ending_date"), "yyyy-MM-dd").cast(
                "timestamp"
            )
        ),
    )
    .filter(F.col("week_ending_date") <= reference_date)
    .withColumn(
        "first_monday_of_week",
        F.to_date(
            F.unix_timestamp(
                F.col("first_monday_of_week"), "yyyy-MM-dd"
            ).cast("timestamp")
        ),
    )
)
market_carrier_rates_df.limit(3).toPandas()

                                                                                

Unnamed: 0,week_ending_date,origin_city,origin_state,origin_zip,destination_city,destination_state,destination_zip,vehicle_type,first_monday_of_week,total_cost_all_providers,total_distance_all_providers
0,2020-02-24,WINCYESTER,VE,22601,ANTERIA,CE,91758,2,2020-02-24,92108.8,71960.0
1,2016-02-28,TEYLARVILLE,IL,62568,KENSES CITY,MA,64101,3,2016-02-22,7141.32,2988.0
2,2016-04-10,BLAAMINGTAN,IL,61702,MINNEEPALIS,MN,55440,2,2016-04-04,7452.0,4140.0


In [15]:
utils.save_data(market_carrier_rates_df, path=data_config["clean"]["base_path"]
        + data_config["clean"]["market_carrier_rates_data_path"])

                                                                                

In [16]:
market_carrier_rates_df = (market_carrier_rates_df.
                                     join(route_mapping_df.select(['zipcode', 'market_id'])
                                       .withColumnRenamed("market_id", "origin_market_id"),
                                       market_carrier_rates_df.origin_zip == route_mapping_df.zipcode,
                                      how='left').drop('zipcode'))

market_carrier_rates_df = (market_carrier_rates_df.
                                      withColumn('first_monday_of_week', F.to_date(F.unix_timestamp(F.col("first_monday_of_week"), "yyyy-MM-dd").
                                      cast("timestamp"))).
                                     join(route_mapping_df.select(['zipcode', 'market_id'])
                                       .withColumnRenamed("market_id", "destination_market_id"),
                                       market_carrier_rates_df.destination_zip == route_mapping_df.zipcode,
                                      how='left').drop('zipcode'))

In [17]:
market_carrier_rates_df = (market_carrier_rates_df.
                                      groupby(['first_monday_of_week', 'origin_market_id', 'destination_market_id', 'vehicle_type']).
                                      agg(F.sum('total_cost_all_providers').alias('sum_of_cost_all_providers'),
                                          F.sum('total_distance_all_providers').alias('sum_of_dist_all_providers')))

market_carrier_rates_df = (market_carrier_rates_df.
                                      withColumn('market_rate_per_mile', 
                                                 F.col('sum_of_cost_all_providers')
                                                 /F.col('sum_of_dist_all_providers')))
dropcols = ['sum_of_cost_all_providers', 'sum_of_dist_all_providers']
market_carrier_rates_df = market_carrier_rates_df.drop(*dropcols)

#### Fuel prices data

In [18]:
fuelprices_df = fuelprices_df.withColumn('first_monday_of_week', 
                                                     F.to_date(F.unix_timestamp(
                                                         F.col("date"),"dd-MM-yyyy")
                                                               .cast("timestamp")))
fuelprices_df = fuelprices_df.filter(F.col('first_monday_of_week')<=reference_date)
utils.save_data(fuelprices_df, path=data_config["clean"]["base_path"]
        + data_config["clean"]["fuel_prices_data_path"])
fuelprices_df.limit(5).toPandas()

                                                                                

Unnamed: 0,date,national_price,first_monday_of_week
0,31-08-2020,2.441,2020-08-31
1,24-08-2020,2.426,2020-08-24
2,17-08-2020,2.427,2020-08-17
3,10-08-2020,2.428,2020-08-10
4,03-08-2020,2.424,2020-08-03


### Target & Feature Generation

Find the routes that have historical rates present in market rates data over the dates available in fuel price data 

In [19]:
final_df = (
        carrier_df.select(
            [
                "trip_id",
                "pickup_date",
                "first_monday_of_week",
                "origin_zip",
                "destination_zip",
                "origin_market_id",
                "destination_market_id",
                "vehicle_type",
                "distance",
                "carrier_price",
            ]
        )
        .join(
            market_carrier_rates_df,
            on=[
                "origin_market_id",
                "destination_market_id",
                "vehicle_type",
                "first_monday_of_week",
            ],
            how="left",
        )
        .where(F.col("market_rate_per_mile").isNotNull())
        .join(fuelprices_df, on=["first_monday_of_week"], how="left")
        .where(F.col("national_price").isNotNull())
    )

# Save the dataset
utils.save_data(final_df, path=data_config["clean"]["base_path"]
        + data_config["clean"]["final_routes_data_path"])

                                                                                

### Independant Features Generation

In [20]:
# Average historical prices for route 
w = Window.partitionBy(['origin_zip', 'destination_zip', 'vehicle_type']).orderBy('pickup_date').rowsBetween(-1,30)
final_df=final_df.withColumn('pastmonth_avg', F.avg('carrier_price').over(w))
w = Window.partitionBy(['origin_zip', 'destination_zip', 'vehicle_type']).orderBy('pickup_date').rowsBetween(-1,7)
final_df=final_df.withColumn('pastweek_avg', F.avg('carrier_price').over(w))

# Model Data Creation

In [21]:
%%time
# Filtering the columns from the primary dataframe where all other feature tables would be merged
final_df = final_df.select('trip_id', 'pickup_date','vehicle_type','origin_zip', 'destination_zip',
                                           'origin_market_id','destination_market_id',
                                           'distance', 'market_rate_per_mile', 'national_price', 'pastmonth_avg',
                                         'pastweek_avg', 'carrier_price')
final_df = final_df.withColumn('vehicle_type', F.col('vehicle_type').cast('string'))

print(f"Total number of rows:\t{final_df.count()}")
print(f"Total number of columns:\t{len(final_df.columns)}")



Total number of rows:	143948
Total number of columns:	13
CPU times: user 225 ms, sys: 144 ms, total: 369 ms
Wall time: 1min 17s


                                                                                

### Columns type identification in the Final dataframe

In [22]:
%%time
types = {
    'numerical': dp.list_numerical_columns,
    'cat_cols': dp.list_categorical_columns,
    'date_cols': dp.list_datelike_columns,
    'bool_cols': dp.list_boolean_columns
}
utils.display_as_tabs([(k,v(final_df)) for k,v in types.items()])

CPU times: user 57.8 ms, sys: 14.2 ms, total: 72 ms
Wall time: 64.9 ms


# Missing values Handling 

Handling them pre train-test-split as the features here are historic and can be imputed by considering the entire data

In [23]:
%%time
dp.identify_missing_values(final_df).toPandas()



CPU times: user 405 ms, sys: 74.8 ms, total: 480 ms
Wall time: 47.6 s


                                                                                

Unnamed: 0,trip_id,pickup_date,vehicle_type,origin_zip,destination_zip,origin_market_id,destination_market_id,distance,market_rate_per_mile,national_price,pastmonth_avg,pastweek_avg,carrier_price
0,0,0,0,0,0,0,0,0,0,0,0,0,0


### Missing value imputation using Pyspark ml Imputer for the "numerical" columns.
Below code block performs missing value imputation using Pyspark ml Imputer ( https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer.html) for the "numerical" columns of a given DataFrame `final_df`. This function handles isnan() and  isNull() values.


1. An `Imputer` object is created with the `inputCols`, `outputCols` and `strategy` parameters. Here, `inputCols` and `outputCols` are set to `national_price` since we want to impute missing values for the this columns only, and the imputation strategy is set to "mean".

2. The `fit()` method is called on the `Imputer` object to create an imputation model based on the given DataFrame `final_df`. 

3. Finally, the `transform()` method is called on the imputer model to impute missing values in the original DataFrame `final_df`, and the imputed DataFrame is stored in the `imputed_data` variable.

In [24]:
%%time
imputer = Imputer(inputCol="national_price", outputCol="national_price", strategy='mean')
imputer_transform = imputer.fit(final_df)
imputed_data = imputer_transform.transform(final_df)
print(f"Total number of rows:\t{imputed_data.count()}")
print(f"Total number of columns:\t{len(imputed_data.columns)}")
dp.identify_missing_values(imputed_data).toPandas()

                                                                                

Total number of rows:	143948
Total number of columns:	13




CPU times: user 634 ms, sys: 191 ms, total: 825 ms
Wall time: 1min 42s


                                                                                

Unnamed: 0,trip_id,pickup_date,vehicle_type,origin_zip,destination_zip,origin_market_id,destination_market_id,distance,market_rate_per_mile,national_price,pastmonth_avg,pastweek_avg,carrier_price
0,0,0,0,0,0,0,0,0,0,0,0,0,0


# Train Test Split of the Model Data

We split the data into train, test (optionally, also a validation dataset).
In this example, we are stratifying the data by target variable classes and split the data with 70% in train and remaining in test data.

In [25]:
%%time
train_df, test_df = dp.test_train_split(
    spark,
    data=final_df,
    target_col="carrier_price",
    train_prop=0.7,
    random_seed=random_seed,
    stratify=True,
    target_type="continuous"
)

CPU times: user 13.6 ms, sys: 1.47 ms, total: 15.1 ms
Wall time: 138 ms


### Saving the train and test data
We save the data into the Databricks filestore as per the path specified in the data config file.

In [26]:
%%time
train_data_path = data_config['processed']['base_path'] + data_config['processed']['train']
test_data_path = data_config['processed']['base_path'] + data_config['processed']['test']

train_df.write.mode("overwrite").parquet(train_data_path)
test_df.write.mode("overwrite").parquet(test_data_path)

                                                                                

CPU times: user 213 ms, sys: 131 ms, total: 344 ms
Wall time: 2min 8s
