In [1]:
dbutils.library.installPyPI('snowflake-connector-python', version = "2.0.2")
dbutils.library.installPyPI("azure-storage-blob", version = "2.1.0")
dbutils.library.installPyPI("lightgbm")
dbutils.library.installPyPI("scikit-learn", version="0.18.2")
dbutils.library.installPyPI('numpy', version="1.18.1")
dbutils.library.installPyPI("dill", version="0.2.9")
dbutils.library.installPyPI("mlflow")
# dbutils.library.installPyPI("matplotlib", version = "1.5.3")
dbutils.library.installPyPI("category_encoders", version="2.2.2")
dbutils.library.installPyPI("pandas", version="0.21.0")


dbutils.library.restartPython()

In [2]:
import pandas as pd
import os
from os import path
from scipy.stats import pearsonr, spearmanr
from scipy import stats
import numpy as np
from lightgbm import LGBMClassifier, LGBMRegressor
from sklearn.metrics import log_loss, accuracy_score, average_precision_score, confusion_matrix, f1_score, roc_auc_score, mean_absolute_error
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
import snowflake.connector
import dill
from sklearn.pipeline import Pipeline
from category_encoders import TargetEncoder
import mlflow

In [3]:
if mlflow.get_tracking_uri() != 'databricks':
  print('update tracking uri')
  mlflow.set_tracking_uri('databricks')

In [4]:
def _get_key_metrics(y, y_pred, y_pred_proba):
  accuracy = accuracy_score(y, y_pred)
  roc_auc = roc_auc_score(y, y_pred_proba)
  pr_auc = average_precision_score(y, y_pred)
  logloss = log_loss(y, y_pred_proba)
  mae = mean_absolute_error(y, y_pred)
  conf_max = confusion_matrix(y, y_pred)/len(y)

  return accuracy, roc_auc, pr_auc, logloss, mae
  
  
def eval_model(y_train, y_train_pred, y_train_pred_proba, y_test, y_test_pred, y_test_pred_proba): 
  y_train = np.reshape(y_train.astype('float').values, (len(y_train), ))
  y_test = np.reshape(y_test.astype('float').values, (len(y_test), ))
  
  print('in y_train, pct of ones', y_train.mean(), 'pct of zeros', 1-y_train.mean())  
  print('in y_test, pct of ones', y_test.mean(), 'pct of zeros', 1-y_test.mean())
  
  residual_std_train = (y_train_pred_proba - y_train).std()
  y_train_std = y_train.std()
  residual_std_test = (y_test_pred_proba - y_test).std()
  y_test_std = y_test.std()
  
  accuracy_train, roc_auc_train, pr_auc_train, log_loss_train, mae_train = _get_key_metrics(y_train, y_train_pred, y_train_pred_proba)
  accuracy_test, roc_auc_test, pr_auc_test, log_loss_test, mae_test = _get_key_metrics(y_test, y_test_pred, y_test_pred_proba)
  
  std_shrinking_test = (residual_std_test - y_test_std)/y_test_std
  std_shrinking_train = (residual_std_test - y_test_std)/y_test_std
  
#     logloss_test_pred = log_loss(y_test_reshaped, y_pred_proba)
#   logloss_baseline = log_loss(y_test_reshaped, np.ones(len(y_pred)))
#   pred_random = np.random.randint(2, size=len(y_test_reshaped))
#   logloss_baseline = log_loss(y_test_reshaped, pred_random)
#   print('logloss_baseline', logloss_baseline)
#   print('logloss_test_pred', logloss_test_pred)
  
#   acc = accuracy_score(y_test_reshaped, y_pred)
#   f1 = f1_score(y_test_reshaped, y_pred)  
#   roc_auc = roc_auc_score(y_test_reshaped, y_pred_proba)
#   print('acc', acc)
#   print('average_precision_score predicted', average_precision_score(y_test_reshaped, y_pred_proba), 'average_precision_score all zeros', average_precision_score(y_test_reshaped, np.zeros(len(y_pred))))
#   print('f1 score', f1)
#   print('roc_auc_score', roc_auc)
  
    
  return std_shrinking_test, std_shrinking_train, \
         residual_std_test, y_test_std, residual_std_train, y_train_std, \
         accuracy_test, roc_auc_test, pr_auc_test, log_loss_test, mae_test, \
         accuracy_train, roc_auc_train, pr_auc_train, log_loss_train, mae_train

#### global variables

In [6]:
number_of_chunks = 10

## Data loading

#### Real-time query snowflake using complicated sql

In [9]:
sql_query_train = """
with daypart_mapping as (
  SELECT 
    case when day_part = 'late_night' then 'latenight' 
      else DAY_PART
    END as daypart,
    MIN(LOCAL_HOUR) as min_hour,
    MAX(LOCAL_HOUR) as max_hour
  FROM PRODDB.STATIC.LOOKUP_DAY_PART_MAPPING 
  GROUP BY 1
),
flf_targets as (
SELECT 
  FLF.STARTING_POINT_ID, 
  FLF.STARTING_POINT_NAME, 
  FLF.TIME_OF_DAY as daypart, 
  dp.min_hour,
  dp.max_hour,
  FLF.TARGET_IDEAL_FLF, 
  FLF.MIN_TARGET_FLF_RANGE, 
  FLF.MAX_TARGET_FLF_RANGE, 
  FLF.TARGET_CREATED_AT, 
  LEAD(FLF.TARGET_CREATED_AT, 1) OVER (PARTITION BY FLF.STARTING_POINT_ID, FLF.TIME_OF_DAY ORDER BY FLF.TARGET_CREATED_AT) as next, 
  IFNULL(next, '2022-01-01') AS NEXT_TARGET_CREATED_DATE
FROM STATIC.LOOKUP_TARGET_FLF_BY_REGION flf
LEFT JOIN daypart_mapping dp 
  on flf.TIME_OF_DAY = dp.DAYPART
),
flf_raw as (
SELECT
  dd.created_at,
  dd.DELIVERY_ID,
  dd.active_date,
  dd.STORE_STARTING_POINT_ID,
  dd.SUBMARKET_ID,
  dd.flf,
  fces.num_delivered as num_delivered,
  fces.num_opened as num_opened,
  sm.LAUNCH_DATE as submarket_launch_date,
  convert_timezone('UTC', dd.TIMEZONE, dd.CREATED_AT) as created_at_local,
  TO_DATE(created_at_local) as created_at_local_date,
  hour(created_at_local) * 2 + floor(minute(created_at_local)/30.0) as window_id,
  datediff('second', dd.CREATED_AT, dd.ACTUAL_DELIVERY_TIME)/60.0 as asap,
  dd.DISTINCT_ACTIVE_DURATION/60.0 as dat,
  datediff('second', dd.DASHER_CONFIRMED_TIME, dd.DASHER_AT_STORE_TIME)/60.0 as d2r,
  case when datediff('second', dd.QUOTED_DELIVERY_TIME, dd.ACTUAL_DELIVERY_TIME)/60 > 20 then 1 else 0 END as lateness_20_min,
  flf.daypart,
  case when dd.flf - flf.MAX_TARGET_FLF_RANGE > 0 then 1 else 0 END as is_flf_above_max,
  case when dd.flf - flf.TARGET_IDEAL_FLF > 0 then 1 else 0 END as is_flf_above_ideal
FROM PRODDB.PUBLIC.DIMENSION_DELIVERIES dd 
LEFT JOIN flf_targets flf
  on dd.STORE_STARTING_POINT_ID = flf.STARTING_POINT_ID
  AND hour(convert_timezone('UTC', dd.TIMEZONE, dd.created_at)) between flf.min_hour and flf.max_hour
  AND convert_timezone('UTC', dd.TIMEZONE, dd.created_at) between flf.TARGET_CREATED_AT and flf.NEXT_TARGET_CREATED_DATE
LEFT JOIN PRODDB.PUBLIC.MAINDB_SUBMARKET sm 
  ON dd.SUBMARKET_ID = sm.ID
LEFT JOIN public.fact_cx_email_summary fces
  ON dd.SUBMARKET_ID = fces.SUBMARKET_ID
  AND convert_timezone('UTC',dd.timezone,
 dateadd('minute',cast(floor(date_part('minute',dd.created_at) / 30) * 30 as int), date_trunc('hour',dd.created_at))
 ) = fces.half_hour_local
WHERE dd.created_at between '2020-03-16' and '2020-04-27'
  AND dd.IS_FILTERED_CORE = true 
  AND dd.IS_ASAP = true 
  AND dd.IS_CONSUMER_PICKUP = false 
  AND fulfillment_type != 'merchant_fleet'
),

flf_raw_grouped as(
SELECT
    t.created_at_local_date,
    t.DAYPART,
    t.STORE_STARTING_POINT_ID,
    AVG(t.ASAP) as avg_asap,
    AVG(t.DAT) as avg_dat,
    AVG(t.D2R) as avg_d2r,
    AVG(t.IS_FLF_ABOVE_IDEAL) as avg_is_flf_above_ideal,
    AVG(t.flf) as avg_flf,
    AVG(t.num_opened) as avg_num_opened,
    AVG(t.num_delivered) as avg_num_delivered
FROM flf_raw t
GROUP BY t.created_at_local_date, t.DAYPART, t.STORE_STARTING_POINT_ID
),
flf_hist as(
SELECT 
    t1.created_at_local_date,
    dayofweek(t1.CREATED_AT_LOCAL) as DAY_OF_WEEK,
    hour(t1.CREATED_AT_LOCAL) as HOUR_OF_DAY,
    t1.STORE_STARTING_POINT_ID,
    t1.SUBMARKET_ID,  
    t1.DAYPART,
    t1.WINDOW_ID,
    t1.is_flf_above_ideal
FROM flf_raw t1
LEFT JOIN flf_raw_grouped t2
    ON t1.created_at_local_date = DATEADD(Day, -15, t2.created_at_local_date)
    AND t1.DAYPART = t2.DAYPART
    AND t1.STORE_STARTING_POINT_ID = t2.STORE_STARTING_POINT_ID
)

SELECT *
FROM flf_hist
SAMPLE(20)
"""

# sql_query_test_verify = """
# CREATE TEMP TABLE CHIZHANG.DAYPART_MAPPING AS (
#   SELECT 
#     CASE WHEN day_part = 'late_night' THEN 'latenight' 
#       ELSE DAY_PART
#     END AS daypart,
#     MIN(LOCAL_HOUR) AS min_hour,
#     MAX(LOCAL_HOUR) AS max_hour
#   FROM PRODDB.STATIC.LOOKUP_DAY_PART_MAPPING 
#   GROUP BY 1
# );

# CREATE temp TABLE CHIZHANG.FLF_TARGETS AS (
# SELECT 
#   FLF.STARTING_POINT_ID, 
#   FLF.STARTING_POINT_NAME, 
#   FLF.TIME_OF_DAY AS daypart, 
#   dp.min_hour,
#   dp.max_hour,
#   FLF.TARGET_IDEAL_FLF, 
#   FLF.MIN_TARGET_FLF_RANGE, 
#   FLF.MAX_TARGET_FLF_RANGE, 
#   FLF.TARGET_CREATED_AT, 
#   LEAD(FLF.TARGET_CREATED_AT, 1) OVER (PARTITION BY FLF.STARTING_POINT_ID, FLF.TIME_OF_DAY ORDER BY FLF.TARGET_CREATED_AT) AS NEXT, 
#   IFNULL(NEXT, '2022-01-01') AS NEXT_TARGET_CREATED_DATE
# FROM STATIC.LOOKUP_TARGET_FLF_BY_REGION flf
# LEFT JOIN CHIZHANG.DAYPART_MAPPING dp 
#   ON flf.TIME_OF_DAY = dp.DAYPART
# );
    
# CREATE temp TABLE CHIZHANG.FLF_RAW AS (
# SELECT
#   dd.created_at,
#   dd.DELIVERY_ID,
#   dd.active_date,
#   dd.STORE_STARTING_POINT_ID,
#   dd.SUBMARKET_ID,
#   dd.flf,
#   fces.num_delivered AS num_delivered,
#   fces.num_opened AS num_opened,
#   sm.LAUNCH_DATE AS submarket_launch_date,
#   convert_timezone('UTC', dd.TIMEZONE, dd.CREATED_AT) AS created_at_local,
#   TO_DATE(created_at_local) AS created_at_local_date,
#   hour(created_at_local) * 2 + floor(minute(created_at_local)/30.0) AS window_id,
#   datediff('second', dd.CREATED_AT, dd.ACTUAL_DELIVERY_TIME)/60.0 AS asap,
#   dd.DISTINCT_ACTIVE_DURATION/60.0 AS dat,
#   datediff('second', dd.DASHER_CONFIRMED_TIME, dd.DASHER_AT_STORE_TIME)/60.0 as d2r,
#   CASE WHEN datediff('second', dd.QUOTED_DELIVERY_TIME, dd.ACTUAL_DELIVERY_TIME)/60 > 20 THEN 1 ELSE 0 END AS lateness_20_min,
#   flf.daypart,
#   CASE WHEN dd.flf - flf.MAX_TARGET_FLF_RANGE > 0 THEN 1 ELSE 0 END AS is_flf_above_max,
#   CASE WHEN dd.flf - flf.TARGET_IDEAL_FLF > 0 THEN 1 ELSE 0 END AS is_flf_above_ideal
# FROM PRODDB.PUBLIC.DIMENSION_DELIVERIES dd 
# LEFT JOIN CHIZHANG.FLF_TARGETS flf
#   ON dd.STORE_STARTING_POINT_ID = flf.STARTING_POINT_ID
#   AND hour(convert_timezone('UTC', dd.TIMEZONE, dd.created_at)) BETWEEN flf.min_hour AND flf.max_hour
#   AND convert_timezone('UTC', dd.TIMEZONE, dd.created_at) BETWEEN flf.TARGET_CREATED_AT AND flf.NEXT_TARGET_CREATED_DATE
# LEFT JOIN PRODDB.PUBLIC.MAINDB_SUBMARKET sm 
#   ON dd.SUBMARKET_ID = sm.ID
# LEFT JOIN public.fact_cx_email_summary fces
#   ON dd.SUBMARKET_ID = fces.SUBMARKET_ID
#   AND convert_timezone('UTC',dd.timezone,
#  dateadd('minute',CAST(floor(date_part('minute',dd.created_at) / 30) * 30 AS INT), date_trunc('hour',dd.created_at))
#  ) = fces.half_hour_local
# //  WHERE CAST(DD.CREATED_AT as DATE) >= dateadd('DAY', -3, TO_TIMESTAMP_NTZ(LOCALTIMESTAMP)) 
#   WHERE dd.created_at BETWEEN '2020-03-16' AND '2020-04-27'
#   AND CAST(DD.CREATED_AT as DATE) < dateadd('DAY', 0, TO_TIMESTAMP_NTZ(LOCALTIMESTAMP))  
#   AND dd.IS_FILTERED_CORE = true 
#   AND dd.IS_ASAP = true 
#   AND dd.IS_CONSUMER_PICKUP = false 
#   AND fulfillment_type != 'merchant_fleet'
# );

# CREATE TEMP TABLE CHIZHANG.FLF_RAW_GROUPED AS(
# SELECT
#     t.created_at_local_date,
#     t.DAYPART,
#     t.STORE_STARTING_POINT_ID,
#     AVG(t.ASAP) AS avg_asap,
#     AVG(t.DAT) AS avg_dat,
#     AVG(t.D2R) AS avg_d2r,
#     AVG(t.IS_FLF_ABOVE_IDEAL) AS avg_is_flf_above_ideal,
#     AVG(t.flf) AS avg_flf,
#     AVG(t.num_opened) AS avg_num_opened,
#     AVG(t.num_delivered) AS avg_num_delivered
# FROM CHIZHANG.FLF_RAW t
# GROUP BY t.created_at_local_date, t.DAYPART, t.STORE_STARTING_POINT_ID
# );

# SELECT 
#     -- t1.CREATED_AT,
#     -- t1.DELIVERY_ID, 
#     -- t1.ACTIVE_DATE, 
#     dayofweek(t1.CREATED_AT_LOCAL) as DAY_OF_WEEK,
#     hour(t1.CREATED_AT_LOCAL) as HOUR_OF_DAY,
#     t1.STORE_STARTING_POINT_ID,
#     t1.SUBMARKET_ID,
#     -- t1.FLF,
#     -- t1.NUM_DELIVERED, 
#     -- t1.NUM_OPENED,
#     -- t1.SUBMARKET_LAUNCH_DATE, 
#     -- t1.CREATED_AT_LOCAL, 
#     -- t1.CREATED_AT_LOCAL_DATE,
#     -- t1.ASAP, 
#     -- t1.DAT,
#     -- t1.D2R, 
#     -- t1.LATENESS_20_MIN,
#     t1.DAYPART,
#     t1.WINDOW_ID
#     -- t1.IS_FLF_ABOVE_MAX, 
#     -- t1.IS_FLF_ABOVE_IDEAL,
#     -- t2.avg_asap,
#     -- t2.avg_dat,
#     -- t2.avg_d2r,
#     -- t2.avg_is_flf_above_ideal,
#     -- t2.avg_flf,
#     -- t2.avg_num_delivered,
#     -- t2.avg_num_opened
# FROM CHIZHANG.FLF_RAW t1
# LEFT JOIN CHIZHANG.FLF_RAW_GROUPED t2
#     ON t1.created_at_local_date = DATEADD(Day, -15, t2.created_at_local_date)
#     AND t1.DAYPART = t2.DAYPART
#     AND t1.STORE_STARTING_POINT_ID = t2.STORE_STARTING_POINT_ID
# SAMPLE(5)
# """


In [10]:
# full query. complicated query from snowflake
# toPandas is time-consuming
scope_name = 'chizhang-scope'
pw_key_name = 'snowflake-password'
un_key_name = 'snowflake-user'
user = dbutils.secrets.get(scope=scope_name, key=un_key_name)
password = dbutils.secrets.get(scope=scope_name, key=pw_key_name)

# snowflake connection options
options = dict(sfurl="doordash.snowflakecomputing.com/",
               sfaccount="DOORDASH",
               sfuser=user,
               sfpassword=password,
               sfdatabase="PRODDB",
               sfschema="public",
               sfwarehouse="ADHOC")

print(user)
print(password)

# sql_load_table = """ select * from variance_reduction_flf_train_test """
all_data = spark.read.format("snowflake").options(**options).option("query", sql_query_train).load()
raw_data = all_data.toPandas()

#### Query pre-generated table in snowflake using simple sql

In [12]:
sql_load_pre_gen_table = """select * from flf_weather_supply_demand sample (20)"""

In [13]:
# simple query.
# load pre-generated table from snowflake

user = dbutils.secrets.get(scope="chizhang-scope", key="snowflake-user")
password = dbutils.secrets.get(scope="chizhang-scope", key="snowflake-password")

os.environ['SNOWFLAKE_USER'] = dbutils.secrets.get(scope="chizhang-scope", key="snowflake-user")
os.environ['SNOWFLAKE_PW'] = dbutils.secrets.get(scope="chizhang-scope", key="snowflake-password")

# snowflake connection options
params = dict(
  user=os.environ['SNOWFLAKE_USER'],
  password=os.environ['SNOWFLAKE_PW'],
  account='DOORDASH',
  database='PRODDB',
  warehouse='ADHOC',
  schema='public',
)

sql_query_simple = """select * from flf_weather_supply_demand sample (20)"""
with snowflake.connector.connect(**params) as ctx:
  raw_data = pd.read_sql(sql_query_simple, ctx)

#### save csv to dbfs

In [15]:
raw_data_mem_size = raw_data.info(memory_usage='deep')
print('raw_data_mem_size', raw_data_mem_size)

#15min to save
[df_i.to_csv('/dbfs/chizhang/variance_reduction/data/data_weather_supply_demand_{id}.csv'.format(id=id)) for id, df_i in  enumerate(np.array_split(raw_data, number_of_chunks))]

#### load from dbfs

In [17]:
#2min to load 10 files
dir = r'/dbfs/chizhang/variance_reduction/data' # use your path

li = []

for id in range(number_of_chunks):
  print('chunk id', id)
  filename = dir + '/data_weather_'+str(id)+'.csv'
  df = pd.read_csv(filename, index_col=None)
  li.append(df)

raw_data = pd.concat(li, axis=0, ignore_index=True)
print(raw_data.shape)

In [18]:
df = raw_data.copy()
df = df.reindex()
print('data size', df.shape)
df.columns = map(str.lower, df.columns)
df.head(5)

In [19]:
# drop cols with >50% na
print(df.shape)
df.columns
df_nan = df.isna().sum()/len(df)

cols_to_remove = list(df_nan[df_nan > 0.5].index)
print('we drop cols with >50% missing values:', cols_to_remove)
df = df.drop(cols_to_remove, axis=1)
print(df.shape)

In [20]:
df.columns

In [21]:
df_dropped = df.dropna(inplace=False)
print(df_dropped.shape)

In [22]:
df.columns

## Feature creating

In [24]:
# combine real-time features with historical aggregated features
# features_cat = ['day_of_week', 'hour_of_day', 'STORE_STARTING_POINT_ID', 'SUBMARKET_ID', 'LATENESS_20_MIN', 'DAYPART', '30min'] #30 min, sp, submarket, ->market level feats
# historical aggregated features := avg(feat_values) over SP_ID/(DATE_LOCAL-15days)/DAYPART units (i.e., 14 days ago) 
# features_num = ['AVG_ASAP', 'AVG_DAT', 'AVG_D2R', 'AVG_FLF', 'AVG_IS_FLF_ABOVE_IDEAL']

# features_cat = ['day_of_week', 'hour_of_day', 'store_starting_point_id', 'submarket_id', 'daypart', 'window_id'] #30 min, sp, submarket, ->market level feats
# # features_num = ['AVG_ASAP', 'AVG_DAT', 'AVG_D2R', 'AVG_FLF', 'AVG_IS_FLF_ABOVE_IDEAL', 'AVG_NUM_DELIVERED', 'AVG_NUM_OPENED']
# features_num = []

features_orig = ['day_of_week', 'hour_of_day', 'store_starting_point_id', 'submarket_id', 'daypart', 'window_id']
features_weather = ['hh_temperature', 'hh_apparent_temperature', 'hh_pressure', 'hh_humidity', 'hh_dewpoint', 'hh_visibility', \
                'hh_wind_speed', 'hh_cloud_cover', 'hh_dewpoint', 'hh_precip_intensity', 'hh_precip_probability']
features_supply_demand = ['pred_demand', 'actual_demand', 'under_predicted_demand']

features_type = {"day_of_week" : "category",
        "hour_of_day" : "category",
        'daypart'     : "category",
        'hh_hourly_weather_summary': "category",
        'hh_precip_type': "category",
        'hh_icon'     : "category",
        
        'store_starting_point_id' : "target_encode", 
        'submarket_id' : "target_encode",   
        'window_id'    : "target_encode",
        
        'hh_temperature' : "numerical", 
        'hh_apparent_temperature' : "numerical", 
        'hh_pressure'    : "numerical",
        'hh_humidity'    : "numerical",
        'hh_dewpoint'    : "numerical", 
        'hh_visibility'  : "numerical",
        'hh_wind_speed'  : "numerical",
        'hh_cloud_cover' : "numerical", 
        'hh_dewpoint'    : "numerical",
        'hh_precip_intensity'   : "numerical",
        'hh_precip_probability' : "numerical"
       }

# features_cat = ['day_of_week', 'hour_of_day', 'store_starting_point_id', 'submarket_id', 'daypart', 'window_id', \
#                'hh_hourly_weather_summary',  'hh_precip_type', 'hh_icon']
# features_num = ['hh_temperature', 'hh_apparent_temperature', 'hh_pressure', 'hh_humidity', 'hh_dewpoint', 'hh_visibility', \
#                 'hh_wind_speed', 'hh_cloud_cover', 'hh_dewpoint', 'hh_precip_intensity', 'hh_precip_probability', \
#                 'pred_demand']

features_target_encode = ['store_starting_point_id', 'submarket_id', 'window_id'] #, 'hh_hourly_weather_summary', 'hh_icon']

features = features_cat + features_num
features = list(set(features) - set(cols_to_remove))
metrics = ['is_flf_above_ideal']


for f in features_type:
  if features_type.get(f) == 'category' or features_type.get(f) == 'target_encode':
    df[f] = df[f].astype('category')
  elif features_type.get(f) == 'numerical':
    df[f] = df[f].astype('float')
  else:
    raise('wrong feature type')
    

for f in features_cat:
  df[f] = df[f].astype('category')
for f in features_num:
  df[f] = df[f].astype('float')

## Eval raw correlation

In [26]:
df.columns

In [27]:
te = TargetEncoder(cols=features_target_encode)
df_temp = te.fit_transform(df[features], df[metrics])
print(df_temp)

# for f in df_te.columns:
#   df[f + '_te'] = df_te[f]

In [28]:
df_temp['is_flf_above_ideal'] = df[metrics]

In [29]:
print(df_temp.corr()['is_flf_above_ideal'].abs().sort_values(ascending=False))

In [30]:
# feat_corr =  ['hh_pressure', 'hh_humidity', 'hh_precip_intensity', 'pred_demand', 'store_starting_point_id', 'submarket_id', 'window_id']


# for f in feat_corr:
#   tmp = df.corr()
#   print(tmp.values[0][1])
# for f in feat_corr:
#   print('feature', f)
#   if f in features_target_encode:
#     print('\tTrain PearsonrResult', pearsonr(ppl_best.named_steps["target_encode"].transform(X_train)[f].values, y_train.values.reshape(len(y_train), )))
#   else:
#     print('\tTrain PearsonrResult', pearsonr(X_train[f], y_train.values.reshape(len(y_train), )))
#   print('\tTrain PearsonrResult', pearsonr(X_train_encoded[f].values, y_train.values.reshape(len(y_train), )))#, spearmanr(X_train_encoded[f], y_train))  
#   print('\tTest PearsonrResult', pearsonr(X_test_encoded[f].values, y_test.values.reshape(len(y_test), )))#, spearmanr(X_test_encoded[f], y_test))

## Train/test data preparing

In [32]:
from datetime import datetime, timedelta

df['created_at_local_date'] = pd.to_datetime(df['created_at_local_date'])
date_train_end = df['created_at_local_date'].max() - timedelta(weeks=1)
date_train_start = date_train_end - timedelta(weeks=5)

train_index = df[(df['created_at_local_date'] < str(date_train_end)) & (df['created_at_local_date'] >= str(date_train_start))].index
test_index = df[df['created_at_local_date'] >= str(date_train_end)].index

print('training period:', df.loc[train_index]['created_at_local_date'].min(), df.loc[train_index]['created_at_local_date'].max())
print('testing period:', df.loc[test_index]['created_at_local_date'].min(), df.loc[test_index]['created_at_local_date'].max())


In [33]:
# check train and test index do not intersects
assert len(train_index.intersection(test_index))==0, 'data leakage'
# check if training and testing data has 7 days a week
assert df.loc[test_index]['day_of_week'].nunique() == 7, 'testing data does not have 7 days a week'
assert df.loc[train_index]['day_of_week'].nunique() == 7, 'training data does not have 7 days a week'

### Create pipeline & train

#### random search

In [36]:
# target encoding
teppl = TargetEncoder(cols=features_target_encode)
# clf
clfppl = LGBMClassifier()

# random search
randomParams = {
    'clf__learning_rate': stats.uniform(0.01, 0.1), 
    'clf__num_leaves': stats.randint(16, 512),
    'clf__boosting_type' : ['gbdt', 'dart', 'goss']    
}

ppl = Pipeline([('target_encode', teppl), ('clf', clfppl)])

random_search = RandomizedSearchCV(ppl, randomParams, n_jobs=-1, verbose=2, n_iter=100)
random_search.fit(X_train, y_train)

In [37]:
random_search.best_params_

In [38]:
ppl_best = random_search.best_estimator_

#### best model after randome search

In [40]:
teppl = TargetEncoder(cols=features_target_encode)

clfppl = LGBMClassifier()

ppl_best = Pipeline([('target_encode', teppl), ('clf', clfppl)])
ppl_best.set_params(clf__learning_rate=0.09985392034627226, clf__boosting_type='gbdt', clf__num_leaves=486)

ppl_best.fit(X_train, y_train)

#### clf vanilla

In [42]:
mlflow.set_experiment('/Shared/Experiments/variance_reduction_flf')

train_mlflow(features)
train_mlflow(features)
def train_mlflow(features):
  with mlflow.start_run(run_name='weather_vanilla'):

      X_train = df.loc[train_index, features]
      y_train = df.loc[train_index, metrics]

      X_test = df.loc[test_index, features]
      y_test = df.loc[test_index, metrics]

      print("x_train shape", X_train.shape, 'y_train shape', y_train.shape)
      print("x_test shape", X_test.shape, 'y_test shape', y_test.shape)

      teppl_vanilla = TargetEncoder(cols=features_target_encode)
      clfppl_vanilla = LGBMClassifier()

      ppl_vanilla = Pipeline([('target_encode_vanilla', teppl_vanilla), ('clf_vanilla', clfppl_vanilla)])

      ppl_vanilla.fit(X_train, y_train)

      y_test_pred = ppl_vanilla.predict(X_test)
      y_test_pred_proba = ppl_vanilla.predict_proba(X_test)[:,1]

      y_train_pred = ppl_vanilla.predict(X_train)
      y_train_pred_proba = ppl_vanilla.predict_proba(X_train)[:,1]

      std_shrinking_test, std_shrinking_train, \
           residual_std_test, y_test_std, residual_std_train, y_train_std, \
           accuracy_test, roc_auc_test, pr_auc_test, log_loss_test, mae_test, \
           accuracy_train, roc_auc_train, pr_auc_train, log_loss_train, mae_train = eval_model(y_train, y_train_pred, y_train_pred_proba, y_test, y_test_pred, y_test_pred_proba)

      mlflow.set_tag('features', features)

      print("Test Log Loss: {0}".format(log_loss_test))
      print("Train Log Loss: {0}".format(log_loss_train))

      print("Test Accuracy Score: {0}".format(accuracy_test))
      print("Train Accuracy Score: {0}".format(accuracy_train))


      print("Test ROC AUC: {0}".format(roc_auc_test))
      print("Train ROC AUC: {0}".format(roc_auc_train))

      print("Test PR AUC: {0}".format(pr_auc_test))
      print("Train PR AUC: {0}".format(pr_auc_train))

      print("Test MAE: {0}".format(mae_test))
      print("Train MAE: {0}".format(mae_train))

      mlflow.log_metric('std shrinking', std_shrinking)

      mlflow.log_metric('residual_std', residual_std_test)
      mlflow.log_metric('y_test_std', y_test_std)
      mlflow.log_metric('test shrinking', std_shrinking_test)

      mlflow.log_metric('residual_std', residual_std_train)
      mlflow.log_metric('y_test_std', y_train_std)
      mlflow.log_metric('test shrinking', std_shrinking_train)


      mlflow.log_metric('accuracy_train', accuracy_train)
      mlflow.log_metric('accuracy_test', accuracy_test)

      mlflow.log_metric('train_pr_auc', pr_auc_train)
      mlflow.log_metric('test_pr_auc', pr_auc_test)

      mlflow.log_metric('train_roc_auc', roc_auc_train)
      mlflow.log_metric('test_roc_auc', roc_auc_test)

### Testing

In [44]:
residual_std_m1, y_std = eval_model(ppl_best, X_test, y_test)

### Important features

In [46]:
dictionary = dict(zip(features, ppl_best['clf'].feature_importances_))

dictionary = {k: v for k, v in sorted(dictionary.items(), key=lambda item: item[1])}

# dictionary = sorted(dictionary.items(), key=operator.itemgetter(1))
print(dictionary)

In [47]:
with open('/dbfs/chizhang/variance_reduction/model/052820_flf_prediction_variance_reduction_ppl.dill', 'wb') as f:
  dill.dump(ppl_best, f)

In [48]:
##### END ####








## validate correlation in unit level

In [50]:
with open('/dbfs/chizhang/variance_reduction/model/051620_flf_prediction_variance_reduction_ppl.dill', 'rb') as f:
  ppl_best = dill.load(f)

In [51]:
for f in ['store_starting_point_id', 'submarket_id', 'window_id']:
  print('feature', f)
  print('\tTrain PearsonrResult', pearsonr(ppl_best.named_steps["target_encode"].transform(X_test)[f].values, y_test.values.reshape(len(y_test), )))

In [53]:
df.groupby(['store_starting_point_id', 'daypart']).agg({'is_flf_above_ideal' : 'mean'})

## Model training

In [55]:
# # features_high_corr = ['STORE_STARTING_POINT_ID', 'SUBMARKET_ID', 'hour_of_day', 'day_of_week'] #better residual std without day_of_week

# features_high_corr = features
# #model 1: default with balancing
# clf_default_is_unbalanced = LGBMClassifier(is_unbalance =True)#is_unbalance =True
# clf_default_is_unbalanced.fit(X_train_encoded[features_high_corr], y_train)

# #model 2: default
# clf_default = LGBMClassifier()
# clf_default.fit(X_train_encoded[features_high_corr], y_train)

In [56]:
# # model 3: grid search
# gridParams = {
#     'learning_rate': [0.01, 0.05, 0.1],
#     'num_leaves': [16, 31, 64, 128],
#     'boosting_type' : ['gbdt', 'dart', 'goss'],
#     'objective' : ['binary'],
#     'max_depth' : [-1, 5, 10],
# #     'random_state' : [501], 
# #     'colsample_bytree' : [0.5,0.7],
# #     'subsample' : [0.5,0.7],
# #     'min_split_gain' : [0.01],
#     'min_data_in_leaf':[5, 10, 20],
# #     'metric':['auc']
#     }
# #{'boosting_type': 'goss', 'objective': 'binary', 'learning_rate': 0.1, 'num_leaves': 64, 'max_depth': -1}

# clf_tuned = LGBMClassifier()
# grid = GridSearchCV(clf_tuned, gridParams,verbose=2, n_jobs=-1)
# grid.fit(X_train_encoded[features_high_corr], y_train)

# # print(grid.best_score_)
# print(grid.best_params_)


In [57]:
# # model 4: random search
# import scipy.stats as stats
# randomParams = {
#     'learning_rate': stats.uniform(0.01, 0.1), 
#     'num_leaves': stats.randint(16, 256),
#     'boosting_type' : ['gbdt', 'dart', 'goss'],
#     'objective' : ['binary'],
#     'max_depth' : [-1, 5, 10],
# #     'random_state' : [501], 
# #     'colsample_bytree' : [0.5,0.7],
# #     'subsample' : [0.5,0.7],
# #     'min_split_gain' : [0.01],
# #     'min_data_in_leaf':[10],
# #     'metric':['auc']
#     }
# #{'boosting_type': 'goss', 'objective': 'binary', 'learning_rate': 0.1, 'num_leaves': 64, 'max_depth': -1}

# clf_tuned_random = LGBMClassifier()
# model_random_search = RandomizedSearchCV(clf_tuned_random, randomParams, verbose=2, n_jobs=-1, n_iter=100)
# model_random_search.fit(X_train_encoded[features_high_corr], y_train)

# # print(grid.best_score_)
# print(model_random_search.best_params_)


## Testing

In [59]:
residual_std_m1, _ = eval_model(clf_default_is_unbalanced, X_test_encoded[features_high_corr], y_test)

In [60]:
residual_std_m2, _ = eval_model(clf_default, X_test_encoded[features_high_corr], y_test)

In [61]:
residual_std_m3, _ = eval_model(grid, X_test_encoded[features_high_corr], y_test)

In [62]:
residual_std_m4, y_std = eval_model(model_random_search, X_test_encoded[features_high_corr], y_test)

## Result plotting

In [64]:
from matplotlib import pyplot as plt
import matplotlib.ticker as mtick

# plot 4 model performance
# std_clf_is_unbalanced = 0.4239141851690449
# std_clf_default = 0.422724398281608
# std_grid_search = 0.4211761747709921
# std_random_search = 0.42077421076528226
# std_random_search_new = 0.4200526230592615 #new features
# std_testing_data = 0.4555727985659514

std_testing_data = y_std

std_clf_is_unbalanced = residual_std_m1
std_clf_default = residual_std_m2
std_grid_search = residual_std_m3
std_random_search = residual_std_m4

heights = [std_testing_data, std_clf_is_unbalanced, std_clf_default, std_grid_search, std_random_search]
heights =[i for i in heights]
heights_pct = [1 - i/std_testing_data for i in heights]
fig, ax = plt.subplots(figsize=(8, 6))
ax.bar(range(len(heights)), heights)
ax.set_ylabel('STD')

plt.xticks(range(len(heights)), ['y std', 'residual_m1 (w/ balancing)', 'residual_m2(w/o balancing)', 'residual_m3(w/ grid search)', 'residual_m4(w/ random search)'])
plt.xticks(rotation=10)
ax2 = ax.twinx()
ax2.plot(heights_pct, '-o', color='black')
ax2.set_ylabel('% of STD Reduction')

vals = ax2.get_yticks()
ax2.set_yticklabels(['{:,.2%}'.format(x) for x in vals])

for i,j in zip(range(len(heights_pct)), heights_pct):
    ax2.annotate('{:,.2%}'.format(j),xy=(i,j))
plt.grid()
display(fig)