In [1]:
%load_ext autoreload
%autoreload 2

# Purpose


This notebook demonstrates the data pipeline from raw tables to analytical datasets. At the end of this activity, train & test data sets are created from raw data.

# Imports

In [2]:
# Standard Library Imports
import sys
import time
import os.path as op
import os
import re
import random
import time
import warnings

# Third Party Imports
import yaml
import hvplot
import panel as pn
import pandas as pd
import numpy as np
import holoviews as hv
from pyspark_dist_explore import (
    Histogram,
    hist,
    distplot,
    pandas_histogram
)
from IPython.display import (
    display,
    display_html
)

# Spark Imports
from pyspark.sql import (
    types as DT,
    functions as F,
    Window
)
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import (
    ParamGridBuilder,
    CrossValidator,
    CrossValidatorModel
)
from pyspark.ml.feature import (
    VectorAssembler,
    StandardScaler,
    StringIndexer,
    OneHotEncoderEstimator,
    Imputer
)
from pyspark.mllib.evaluation import RegressionMetrics

# Project Imports
from ta_lib.pyspark import (
    dp,
    features,
    model_gen,
    model_eval,
    utils,
    eda,
    context
)

# options
random_seed = 0
pn.extension('bokeh')
warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None)



# Initialization

`config.yml` is used to store all the parameters required for the template

In [3]:
config_path = op.join(os.getcwd(),'conf', 'config.yml')
with open(config_path, 'r') as fp:
    config = yaml.load(fp)
config

{'all': {'core': 'default',
  'log_catalog': 'production',
  'data_catalog': 'local',
  'job_catalog': 'local'},
 'spark': {'spark.executer.cores': 4, 'spark.cores.max': 4}}

In [4]:
data_config_path = op.join(os.getcwd(),'conf/data_catalog', 'remote.yml')
with open(data_config_path, 'r') as fp:
    data_config = yaml.load(fp)
data_config

{'reference_date': datetime.date(2019, 4, 26),
 'num_days_prediction': 90,
 'raw': {'filesystem': 'dbfs',
  'base_path': '/FileStore/tables/vacation_partitioned/',
  'call_data_path': 'dial_summary.parquet',
  'last_activity_data_path': 'customer_activity.parquet',
  'booking_data_path': 'class_labels.parquet',
  'consumer_data_path': 'customer.parquet',
  'web_data_path': 'itr_data_*.parquet'},
 'clean': {'filesystem': 'dbfs',
  'base_path': '/FileStore/tables/vacation_clean/',
  'call_data_path': 'dial_summary.parquet',
  'last_activity_data_path': 'customer_activity.parquet',
  'booking_data_path': 'class_labels.parquet',
  'consumer_data_path': 'customer.parquet',
  'web_data_path': 'itr_data_*.parquet'},
 'processed': {'filesystem': 'dbfs',
  'base_path': '/FileStore/tables/spark_warehouse/',
  'train': 'train.parquet',
  'test': 'test.parquet',
  'preds': 'predictions.parquet'},
 'spark': {'spark.executer.cores': 4, 'spark.cores.max': 4}}

## Create spark session

`talib.pyspark.context` module is leveraged to build the sparksession so as to consider the spark session related params in the config file while building the session.

In [5]:
%%time
session = context.CustomSparkSession(config)
session.CreateSparkSession()
spark = session.spark
sc = session.sc

Wall time: 12.8 s


# Data Read

In [20]:
%%time
df_call_data = dp.read_data(
    spark=spark,
    paths=[data_config['raw']['base_path'] + data_config['raw']['call_data_path']],
    fs=data_config['raw']['filesystem'],
    fmt="parquet",
    header="true",
    inferschema="true"
)
df_last_activity_data = dp.read_data(
    spark=spark,
    paths=[data_config['raw']['base_path'] + data_config['raw']['last_activity_data_path']],
    fs=data_config['raw']['filesystem'],
    fmt="parquet",
    header="true",
    inferschema="true"
)
df_booking_data = dp.read_data(
    spark=spark,
    paths=[data_config['raw']['base_path'] + data_config['raw']['booking_data_path']],
    fs=data_config['raw']['filesystem'],
    fmt="parquet",
    header="true",
    inferschema="true"
)
df_consumer_data = dp.read_data(
    spark=spark,
    paths=[data_config['raw']['base_path'] + data_config['raw']['consumer_data_path']],
    fs=data_config['raw']['filesystem'],
    fmt="parquet",
    header="true",
    inferschema="true"
)
df_web_data = dp.read_data(
    spark=spark,
    paths=[data_config['raw']['base_path'] + data_config['raw']['web_data_path']],
    fs=data_config['raw']['filesystem'],
    fmt="parquet",
    header="true",
    inferschema="true"
)

Wall time: 6.98 s


# Data Processing

The focus here is to create a cleaned dataset that is appropriate for solving the DS problem at hand from the raw data.

**Do's**

* Clean dataframe column names
* Ensure dtypes are set properly
* Join with other tables etc to create features
* Transform, if appropriate, datetime like columns to generate additional features (weekday etc)
* Discard cols that are not useful for training the model (IDs, constant cols, duplicate cols etc) additional features generated from existing columns

**Dont's**

* Handle missing values or outliers here. Mark them and leave them for processing downstream.

### Reference Dates

`current_date` - date of reference for consideration for historical data.<br>
`pred_period_start` `and pred_period_end` - date range for building the target variable.

In [7]:
reference_date = pd.to_datetime(data_config['reference_date'])
pred_period_start = reference_date + pd.Timedelta(days=1)
pred_period_end = reference_date + pd.Timedelta(days=data_config['num_days_prediction'])
print(f"Reference date: {reference_date} \t Prediction start date: {pred_period_start} \t Prediction end date: {pred_period_end} ")

Reference date: 2019-04-26 00:00:00 	 Prediction start date: 2019-04-27 00:00:00 	 Prediction end date: 2019-07-25 00:00:00 


### Data Filtering

Choose `reference_date` (present in the config) post which we will predict the customer had made the booking in next 3 months.<br>
Filter all Feature dataset on the `reference_date`.<br>
Filter target dataset on `Prediction period start` and `Prediction period end`.

In [8]:
# Filter call_data dataframe
df_call_data = df_call_data.withColumn(
    'call_date', 
    F.to_date(F.unix_timestamp(F.col("call_date"), "ddMMMyyyy:HH:mm:ss").cast("timestamp"))
)
df_call_data = df_call_data.filter(F.col('call_date')<=reference_date)

# Filter last_activity dataframe
df_last_activity_data = df_last_activity_data.dropDuplicates(['customer_id','actvty_date','actvty_type','actvty_prod'])
last_activity_date_cols = ['load_date','actvty_date']
for col in last_activity_date_cols:
    df_last_activity_data = df_last_activity_data.withColumn(
        col, 
        F.to_date(F.unix_timestamp(F.col(col), "ddMMMyyyy:HH:mm:ss").cast("timestamp"))
    )
df_last_activity_data = df_last_activity_data.filter(F.col('actvty_date')<=reference_date)

# Filter web_data dataframe
df_web_data = df_web_data.dropDuplicates(['customer_id','visit_date','device_type_name','visit_type'])
df_web_data = df_web_data.withColumn(
    'visit_date', 
    F.to_date(F.unix_timestamp(F.col("visit_date"), "ddMMMyyyy:HH:mm:ss").cast("timestamp"))
)
df_web_data = df_web_data.filter(F.col('visit_date')<=reference_date)

# Filter consumer_data dataframe
drop_lst = ['click_pct',
           'open_pct',
           'max_event_date',
           'booked_flag']
df_consumer_data = df_consumer_data.drop(*drop_lst)
df_consumer_data = df_consumer_data.withColumn(
    'cel_first_cruise_date', 
    F.to_date(F.unix_timestamp(F.col("cel_first_cruise_date"), "ddMMMyyyy:HH:mm:ss").cast("timestamp"))
)
df_consumer_data = df_consumer_data.filter(F.col('cel_first_cruise_date')<=reference_date)

# Filter booking_data dataframe
df_booking_data = df_booking_data.withColumn(
    'booking_create_date',
    F.to_date(F.unix_timestamp(F.col("booking_create_date"), "ddMMMyyyy:HH:mm:ss").cast("timestamp"))
)
df_booking_data = df_booking_data.filter((F.col('booking_create_date') >= pred_period_start) \
                                        & (F.col('booking_create_date') <= pred_period_end))
df_booking_data = df_booking_data.select('customer_id')\
                                .dropDuplicates()\
                                .withColumn('target_var',F.lit(1))

### Target Feature Generation

Find number of common consumers across all feature tables.<br>
Binary column is built based on accurance of a booking for a consumer in the pred period

In [9]:
%%time
df_common_consumer = df_consumer_data.select('customer_id')\
                                    .join(df_call_data.select("customer_id"), on ='customer_id',how='inner')\
                                    .join(df_web_data.select("customer_id"), on='customer_id', how='inner')\
                                    .join(df_last_activity_data.select("customer_id"), on='customer_id', how='inner')\
                                    .dropDuplicates()
print(f"Total number of common consumers in all feature tables\t: {df_common_consumer.count()}")

# Map with target labels
df_common_consumer_booking = df_common_consumer.join(df_booking_data, on='customer_id', how='left')\
                                            .dropDuplicates()\
                                            .fillna(0, subset=['target_var'])


booked_size = df_common_consumer_booking.filter(F.col('target_var')==1).count()
non_booked_size = df_common_consumer_booking.filter(F.col('target_var')==0).count()

print(f"Booked size\t: {booked_size}")
print(f"Non booked size\t: {non_booked_size}")

Total number of common consumers in all feature tables	: 106530
Booked size	: 13033
Non booked size	: 93497
Wall time: 1min 7s


### Independant Features Generation

last_call_date_diff - number of days since the consumer made the last call <br>
last_act_date_diff - number of days since the consumer was last active <br>
last_web_date_diff - number of days since the consumer was last web activity <br>
total_sec_spent - total time spent in seconds. <br>
total_page_view_count - total page view count. <br>
age  <br>
gender_code <br>
state_code <br>
rci_qualify_cruise_qty

In [10]:
%%time
# number of days since the consumer made the last call
df_call_data = df_call_data.withColumn("last_call_date_diff", 
                      F.datediff(F.to_date(F.lit(reference_date.strftime('%Y-%m-%d'))),
                       F.col("call_date")))

df_call_data = df_call_data.groupby('customer_id').agg(F.min('last_call_date_diff').alias('last_call_date_diff'))

df_call_data = df_call_data.where(F.col('customer_id').isNotNull())

Wall time: 34.8 ms


In [11]:
%%time
# number of days since the consumer was last active
df_last_activity_data = df_last_activity_data.withColumn("last_act_date_diff", 
                      F.datediff(F.to_date(F.lit(reference_date.strftime('%Y-%m-%d'))),
                       F.col("actvty_date")))

df_last_activity_data = df_last_activity_data.groupby('customer_id').agg(F.min('last_act_date_diff').alias('last_act_date_diff'))

Wall time: 31.2 ms


In [12]:
%%time
# number of days since the consumer was last web activity, total time spent in seconds, total page view count
df_web_data = df_web_data.withColumn("last_web_date_diff", 
                      F.datediff(F.to_date(F.lit(reference_date.strftime('%Y-%m-%d'))),
                       F.col("visit_date")))

df_web_data = df_web_data.groupby('customer_id').agg(F.min('last_web_date_diff').alias('last_web_date_diff'),
                                                     F.sum('sec_time_spent_on_nbr').alias('total_sec_spent'),
                                                     F.sum('page_view_count').alias('total_page_view_count'))

Wall time: 31.3 ms


# Model Data Creation

In [13]:
%%time
# Filtering the columns from the primary dataframe where all other feature tables would be merged
df_consumer_data = df_consumer_data.select('customer_id','age','gender_code','state_code','rci_qualify_cruise_qty')

df_consumer_data = df_consumer_data.join(df_call_data, on='customer_id', how='left')\
                        .join(df_last_activity_data, on='customer_id', how='left')\
                        .join(df_web_data, on='customer_id', how='left')

df_consumer_data = df_consumer_data.join(df_common_consumer_booking, on='customer_id', how='inner')

print(f"Total number of rows:\t{df_consumer_data.count()}")
print(f"Total number of columns:\t{len(df_consumer_data.columns)}")

Total number of rows:	106530
Total number of columns:	11
Wall time: 39.9 s


### Columns type identification in the Final dataframe

In [14]:
%%time
types = {
    'nemerical': dp.list_numerical_columns,
    'cat_cols': dp.list_categorical_columns,
    'date_cols': dp.list_datelike_columns,
    'bool_cols': dp.list_boolean_columns
}
utils.display_as_tabs([(k,v(df_consumer_data)) for k,v in types.items()])

Wall time: 3 ms


# Missing values Handling 

Handling them pre train-test-split as the features here are historic and can be imputed by considering the entire data

In [15]:
%%time
dp.identify_missing_values(df_consumer_data).toPandas()

Wall time: 44.8 s


Unnamed: 0,customer_id,age,gender_code,state_code,rci_qualify_cruise_qty,last_call_date_diff,last_act_date_diff,last_web_date_diff,total_sec_spent,total_page_view_count,target_var
0,0,0,0,14313,0,0,0,0,0,0,0


By default, the imputation is done using the below methods:
1. Mean for numerical cols
2. Mode for categorical cols and boolean cols

We can explicitly define the method for imputation for each of the columns in the `rules` dictionary. 
<br>Method consists of :`["mean", "median", "mode", "constant"]`

For `Method="constant"` , we can choose the impute value in the `impute_val` attribute. By default, `"0"` for numerical columns, and `"NA"` for all other column types



    imputer = dp.Imputer(cols=["rci_qualify_cruise_qty"],rules={"rci_qualify_cruise_qty":{"method":"constant","impute_val":"0"}})
    imputer.fit(df_consumer_data)
    imputed_data = imputer.transform(df_consumer_data)



In [16]:
%%time
imputer = dp.Imputer(cols=[])
imputer.fit(df_consumer_data)
imputed_data = imputer.transform(df_consumer_data)
dp.identify_missing_values(imputed_data).toPandas()

Imputation cols and values:	{'state_code': 'FL'}
Wall time: 2min 5s


Unnamed: 0,customer_id,age,gender_code,state_code,rci_qualify_cruise_qty,last_call_date_diff,last_act_date_diff,last_web_date_diff,total_sec_spent,total_page_view_count,target_var
0,0,0,0,0,0,0,0,0,0,0,0


# Train Test Split of the Model Data

We split the data into train, test (optionally, also a validation dataset).
In this example, we are stratifying the data by target variable classes and split the data with 70% in train and remaining in test data.

In [17]:
%%time
train_df, test_df = dp.test_train_split(
    spark,
    data=imputed_data,
    target_col="target_var",
    train_prop=0.7,
    random_seed=random_seed,
    stratify=True,
    target_type="categorical"
)
print(train_df.count(), test_df.count())

90647 15883
Wall time: 1min 30s


### Saving the train and test data

In [18]:
%%time
train_data_path = data_config['processed']['filesystem'] + ":" + data_config['processed']['base_path'] + data_config['processed']['train']
test_data_path = data_config['processed']['filesystem'] + ":" + data_config['processed']['base_path'] + data_config['processed']['test']

train_df.write.mode("overwrite").parquet(train_data_path)
test_df.write.mode("overwrite").parquet(test_data_path)

Wall time: 1min 56s
