In [0]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [0]:
import pyspark
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import json
import time

from src.preprocessing import DataFactory

In [0]:
CATALOG = "workspace"
SCHEMA = "safe_driver_prediction"
RECIPE_PATH = "./configs/feature_store_recipe.json"

train_df = spark.read.table(f"{CATALOG}.{SCHEMA}.train")
test_df = spark.read.table(f"{CATALOG}.{SCHEMA}.test")

print(f"train set has {train_df.count()} rows and {len(train_df.columns)} columns.")
print(f"test set has {test_df.count()} rows and {len(test_df.columns)} columns.")

with open(RECIPE_PATH) as f:
    steps = json.load(f)
print(f"loaded preprocessing recipe with {len(steps)} steps from {RECIPE_PATH}")

train set has 595212 rows and 59 columns.
test set has 892816 rows and 58 columns.
loaded preprocessing recipe with 5 steps from ./recipe.json


In [0]:
train_df.limit(10).display()

id,target,ps_ind_01,ps_ind_02_cat,ps_ind_03,ps_ind_04_cat,ps_ind_05_cat,ps_ind_06_bin,ps_ind_07_bin,ps_ind_08_bin,ps_ind_09_bin,ps_ind_10_bin,ps_ind_11_bin,ps_ind_12_bin,ps_ind_13_bin,ps_ind_14,ps_ind_15,ps_ind_16_bin,ps_ind_17_bin,ps_ind_18_bin,ps_reg_01,ps_reg_02,ps_reg_03,ps_car_01_cat,ps_car_02_cat,ps_car_03_cat,ps_car_04_cat,ps_car_05_cat,ps_car_06_cat,ps_car_07_cat,ps_car_08_cat,ps_car_09_cat,ps_car_10_cat,ps_car_11_cat,ps_car_11,ps_car_12,ps_car_13,ps_car_14,ps_car_15,ps_calc_01,ps_calc_02,ps_calc_03,ps_calc_04,ps_calc_05,ps_calc_06,ps_calc_07,ps_calc_08,ps_calc_09,ps_calc_10,ps_calc_11,ps_calc_12,ps_calc_13,ps_calc_14,ps_calc_15_bin,ps_calc_16_bin,ps_calc_17_bin,ps_calc_18_bin,ps_calc_19_bin,ps_calc_20_bin
863026,0,0,1,2,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0.2,0.1,-1.0,6,1,-1,0,1,15,1,1,0,1,5,2,0.4472135954999999,1.1285549892,0.4,3.4641016151,0.2,0.4,0.0,2,2,8,3,11,3,11,7,2,2,11,0,1,1,0,0,0
863030,1,1,2,1,0,0,1,0,0,0,0,0,0,0,0,8,0,1,0,0.7,0.9,1.9031224343,11,1,-1,0,-1,0,1,1,1,1,78,3,0.4,0.7979959403,0.3848376281,3.3166247904,0.4,0.3,0.8,2,4,8,4,8,2,9,6,1,2,5,0,1,0,0,1,0
863031,0,0,1,7,0,0,1,0,0,0,0,0,0,0,0,12,1,0,0,0.3,0.2,-1.0,5,1,0,0,0,0,1,1,2,1,95,2,0.399374511,0.6440605947,0.3322649545,2.0,0.1,0.0,0.0,2,2,8,4,10,4,7,2,3,2,10,1,1,0,0,0,0
863033,0,1,3,8,1,0,0,0,0,1,0,0,0,0,0,3,1,0,0,0.9,1.2,0.6025985397,11,0,1,0,1,7,1,1,2,1,79,3,0.4,1.0048230797,0.3466987165,3.6055512755,0.6,0.6,0.0,0,1,8,4,10,3,9,2,4,3,10,0,0,1,1,0,0
863034,0,5,2,1,1,0,0,0,1,0,0,0,0,0,0,6,0,1,0,0.9,1.2,1.6678953803999998,11,1,0,0,1,1,1,1,2,1,70,3,0.3741657387,0.7491041481999999,0.3391164992,3.0,0.5,0.3,0.4,2,2,8,3,10,4,8,5,2,2,6,0,1,1,0,0,0
863036,0,7,1,3,1,0,0,0,0,1,0,0,0,0,0,7,1,0,0,0.9,0.5,0.909326674,6,1,-1,0,1,1,1,1,0,1,64,1,0.316227766,0.6621246793,0.3701351105,2.8284271247,0.2,0.2,0.3,4,2,7,2,10,1,11,3,0,0,5,0,0,0,0,0,0
863039,0,0,2,6,1,0,0,1,0,0,0,0,0,0,0,5,1,0,0,0.1,0.1,-1.0,7,1,-1,0,-1,1,1,1,2,1,25,3,0.3741657387,0.7942848543000001,-1.0,3.6055512755,0.5,0.9,0.2,3,3,8,3,7,5,14,4,2,0,9,0,1,0,1,1,0
863040,0,1,3,6,1,0,0,0,1,0,0,0,0,0,0,1,0,0,1,0.5,0.2,0.6694213918999999,11,1,-1,0,-1,14,1,1,2,1,82,3,0.316227766,0.7445775051000001,0.3685105154999999,3.4641016151,0.4,0.8,0.9,3,4,8,2,9,1,10,3,3,3,3,0,1,0,0,0,0
863043,0,0,2,1,0,0,0,1,0,0,0,0,0,0,0,6,1,0,0,0.4,0.0,0.5273755777,11,0,1,9,1,2,-1,0,1,1,104,3,0.4472135954999999,1.6885614201,0.4669047012,3.7416573868,0.1,0.5,0.1,0,0,8,3,11,2,8,1,0,1,14,0,1,1,0,1,1
863044,0,0,1,5,0,0,1,0,0,0,0,0,0,0,0,10,1,0,0,0.3,0.2,-1.0,10,1,-1,0,1,11,1,1,0,1,103,1,0.316227766,0.5818892002,0.3583294573,2.8284271247,0.0,0.2,0.1,1,3,8,3,8,4,9,6,1,2,11,0,1,0,0,0,0


In [0]:
def execute_function(preprocessor, step):
    """
    executes a function from the preprocessor object, 
    calculate the elapsed time and return the result
    """
    function = step["function"]
    parameters = step["parameters"]
    print(f"running function: '{function}'...")

    start_time = time.time()
    func = getattr(preprocessor, function)
    result = func(**parameters)
    elapsed_time = time.time() - start_time

    print(f"it took {elapsed_time: .4f} seconds to execute.")
    print("----------------------------------")
    return result

In [0]:
preprocessor = DataFactory()

for df in [train_df, test_df]:
    preprocessor.set_df(df=df)

    print(f"*** preprocessing {'train' if df == train_df else 'test'} dataset ***")
    for step in steps:
        execute_function(preprocessor, step)
    print(f"*** preprocessing completed ***")
    feature_store_df = preprocessor.get_df()
    
    output_table_name = f"feature_store_{'train' if df == train_df else 'test'}"
    feature_store_df.write\
        .option("mergeSchema", "true")\
            .mode("overwrite")\
                .saveAsTable(f"{CATALOG}.{SCHEMA}.{output_table_name}")
    
    print(f"*** saved {output_table_name} to the feature store ***")
    print("----------------------------------")

*** preprocessing train dataset ***
running function: 'standardize_df'...
removed prefix 'ps_' from column names to standardize naming.
replaced -1 with None for nulls in the dataset.
it took  0.1241 seconds to execute.
----------------------------------
running function: 'drop_duplicates'...
dropped 0 duplicate rows from the dataset.
it took  1.5080 seconds to execute.
----------------------------------
running function: 'handle_high_null_columns'...
columns with null ratio higher than 0.4: ['car_03_cat', 'car_05_cat']
imputed high-null column 'car_03_cat' using classification decision tree model.
imputed high-null column 'car_05_cat' using classification decision tree model.
imputed 2 columns with high null ratio using prediction.
it took  68.9112 seconds to execute.
----------------------------------
running function: 'auto_impute_missing_data'...
imputed 31 categorical/binary columns with mode value.
imputed 26 numerical columns with median value.
it took  176.8319 seconds to execu

In [0]:
# display latest feature store dataframe for review
feature_store_df.limit(10).display()

id,ind_01,ind_02_cat,ind_03,ind_04_cat,ind_05_cat,ind_06_bin,ind_07_bin,ind_08_bin,ind_09_bin,ind_10_bin,ind_11_bin,ind_12_bin,ind_13_bin,ind_14,ind_15,ind_16_bin,ind_17_bin,ind_18_bin,reg_01,reg_02,reg_03,car_01_cat,car_02_cat,car_03_cat,car_04_cat,car_05_cat,car_06_cat,car_07_cat,car_08_cat,car_09_cat,car_10_cat,car_11_cat,car_11,car_12,car_13,car_14,car_15,calc_01,calc_02,calc_03,calc_04,calc_05,calc_06,calc_07,calc_08,calc_09,calc_10,calc_11,calc_12,calc_13,calc_14,calc_15_bin,calc_16_bin,calc_17_bin,calc_18_bin,calc_19_bin,calc_20_bin,car_04_cat_simplified,car_11_cat_simplified,ind_02_cat_simplified,ind_05_cat_simplified
582442,1,1,2,1,0,0,1,0,0,0,0,0,0,0,11,0,1,0,0.2,0.3,0.8027297179,7,1,0.0,0,1.0,0,1,1,2,1,32,3,0.316227766,0.6425832422,0.3615245497,3.0,0.6,0.7,0.2,4,3,8,2,8,3,9,8,1,1,7,0,1,0,0,1,0,1,0,1,1
582933,1,1,6,0,0,0,0,0,1,0,0,0,0,0,9,1,0,0,0.9,0.2,0.7356969484999999,7,1,1.0,2,1.0,11,1,1,0,1,14,0,0.3741657387,0.7028425167,0.4302324953,3.1622776602,0.1,0.9,0.1,3,1,9,5,10,4,8,9,1,3,11,0,0,1,0,1,0,0,0,1,1
583831,0,2,0,1,4,0,0,1,0,0,0,0,0,0,10,0,0,0,0.5,1.0,1.3831576194,10,1,1.0,0,1.0,11,1,0,0,1,99,2,0.316227766,0.7509222979000001,0.3687817783,3.6055512755,0.9,0.5,0.1,3,3,8,4,9,3,11,9,1,4,3,0,1,1,1,1,0,1,0,0,0
585124,7,3,7,0,0,0,0,0,1,0,0,0,0,0,13,1,0,0,0.9,1.3,1.4635146053,11,1,0.0,8,1.0,15,1,1,2,1,41,2,0.5291502621999999,0.7426638451,0.4260281681,0.0,0.0,0.1,0.1,1,1,9,2,12,1,6,4,3,2,2,0,1,0,1,0,0,0,0,0,1
587467,6,1,3,1,0,0,0,0,1,0,0,0,0,0,3,0,1,0,0.9,0.3,1.1031205736,6,1,1.0,0,0.0,11,1,1,0,1,16,3,0.3741657387,0.6962413849,0.3887158345,3.1622776602,0.1,0.2,0.1,2,0,6,2,8,3,4,6,1,0,8,0,0,1,0,1,0,1,0,1,1
587763,2,1,4,1,4,0,1,0,0,0,0,0,0,0,5,0,1,0,0.7,0.5,0.9978101022,7,0,1.0,0,0.0,15,1,0,1,1,104,2,0.4472135954999999,1.2195040314,0.3255764119,3.7416573868,0.4,0.1,0.7,2,2,8,3,8,1,11,6,2,5,5,0,1,0,0,1,0,1,1,1,0
588818,0,4,4,0,0,1,0,0,0,0,0,0,0,0,8,1,0,0,0.1,0.1,0.8027297179,7,1,0.0,0,0.0,0,1,1,2,1,87,3,0.316227766,0.7189006759000001,0.3615245497,3.4641016151,0.0,0.5,0.1,2,2,7,5,11,3,10,2,2,5,5,0,1,1,1,0,0,1,0,0,1
589503,1,1,2,0,0,0,1,0,0,0,0,0,0,0,9,1,0,0,0.1,0.1,0.8027297179,7,0,1.0,0,1.0,10,1,1,2,1,55,2,0.4472135954999999,1.237040256,0.36138622,3.7416573868,0.7,0.2,0.1,3,4,9,4,8,1,8,5,0,1,8,0,1,0,0,0,0,1,0,1,1
590743,0,1,4,1,6,1,0,0,0,0,0,0,0,0,5,0,1,0,0.0,0.0,0.6204836823,11,1,0.0,0,1.0,11,1,1,2,1,19,3,0.316227766,0.6117310488000001,0.3840572874,2.2360679775,0.8,0.3,0.2,4,3,6,1,10,4,11,1,1,2,13,0,1,1,0,0,0,1,0,1,0
591552,3,1,3,1,0,0,1,0,0,0,0,0,0,0,13,0,1,0,0.4,0.6,0.7185053932,6,0,1.0,0,1.0,15,1,1,0,1,5,2,0.4242640687,0.7499586765,0.3958535083,2.2360679775,0.4,0.9,0.0,1,1,10,2,9,3,5,4,0,2,8,0,0,1,0,1,1,1,0,1,1
