# Ingest Raw Data into Data Lakehouse

In [1]:
# turn off warnings
import warnings
warnings.filterwarnings("ignore")

In [2]:
# define function to handle downloading a file from a given url
import requests 
def download_url(url, save_path, chunk_size=128):
    r = requests.get(url, stream=True)
    with open(save_path, 'wb') as fd:
        for chunk in r.iter_content(chunk_size=chunk_size):
            fd.write(chunk)

In [3]:
# download zip file from website
download_url("https://github.com/JoeKnittel/joeknittel.github.io/raw/main/tidymodels_data.zip", "data.zip")

In [4]:
# unzip zip file
import zipfile
with zipfile.ZipFile("data.zip", 'r') as zip_ref:
    zip_ref.extractall("./")

In [5]:
# create a spark session with delta lake enabled
import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("DeltaTest") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [6]:
# ingest the raw .csv file into a spark dataframe
df = (spark.read.format("csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .load("./Data/data.csv"))

In [7]:
# ingest the spark dataframe into a BRONZE delta table [in our data lakehouse] (overwrites the table, if it already exists)
df.write.format("delta").mode("overwrite").save("./delta/BRONZE_TABLE")

In [8]:
# inspect the delta table
data = spark.read.format("delta").load("./delta/BRONZE_TABLE")
data.select(data.columns[1:10]).show(10)

+--------------+--------------+--------------+--------------+--------------+--------------+--------------+-----------+-----------+
|Product_Info_1|Product_Info_2|Product_Info_3|Product_Info_4|Product_Info_5|Product_Info_6|Product_Info_7|    Ins_Age|         Ht|
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+-----------+-----------+
|             1|            A7|            26|           0.0|             2|             3|             1|0.611940299|0.763636364|
|             1|            D4|            26|   0.487179487|             2|             1|             1|0.059701493|0.818181818|
|             1|            D1|            26|   0.076923077|             2|             3|             1|0.656716418|0.709090909|
|             1|            A8|            26|   0.076923077|             2|             3|             1|0.701492537|0.727272727|
|             1|            D3|            10|   0.128205128|             2|       

# Some Data Cleaning and Exploratory Analysis

### Update Data Types

In [9]:
categorical = ["Product_Info_1","Product_Info_2","Product_Info_3","Product_Info_5","Product_Info_6","Product_Info_7","Employment_Info_2",
               "Employment_Info_3", "Employment_Info_5","InsuredInfo_1","InsuredInfo_2","InsuredInfo_3","InsuredInfo_4","InsuredInfo_5", 
               "InsuredInfo_6", "InsuredInfo_7","Insurance_History_1","Insurance_History_2","Insurance_History_3","Insurance_History_4", 
               "Insurance_History_7", "Insurance_History_8","Insurance_History_9","Family_Hist_1","Medical_History_2","Medical_History_3",
               "Medical_History_4","Medical_History_5", "Medical_History_6","Medical_History_7","Medical_History_8","Medical_History_9", 
               "Medical_History_11","Medical_History_12","Medical_History_13", "Medical_History_14","Medical_History_16",
               "Medical_History_17", "Medical_History_18","Medical_History_19","Medical_History_20","Medical_History_21", 
               "Medical_History_22","Medical_History_23", "Medical_History_25","Medical_History_26","Medical_History_27",
               "Medical_History_28","Medical_History_29", "Medical_History_30","Medical_History_31","Medical_History_33",
               "Medical_History_34","Medical_History_35","Medical_History_36","Medical_History_37", "Medical_History_38",
               "Medical_History_39","Medical_History_40", "Medical_History_41"]

In [12]:
continuous = ["Product_Info_4", "Ins_Age", "Ht", "Wt", "BMI", "Employment_Info_1", "Employment_Info_4", "Employment_Info_6", 
              "Insurance_History_5", "Family_Hist_2", "Family_Hist_3", "Family_Hist_4", "Family_Hist_5"]

In [13]:
discrete = ["Medical_History_1", "Medical_History_10", "Medical_History_15", "Medical_History_24", "Medical_History_32"]

In [14]:
dummy = ["Medical_Keyword_1",  "Medical_Keyword_2",  "Medical_Keyword_3",  "Medical_Keyword_4",  "Medical_Keyword_5",  
         "Medical_Keyword_6",  "Medical_Keyword_7",  "Medical_Keyword_8",  "Medical_Keyword_9", "Medical_Keyword_10", 
         "Medical_Keyword_11", "Medical_Keyword_12", "Medical_Keyword_13", "Medical_Keyword_14", "Medical_Keyword_15", 
         "Medical_Keyword_16", "Medical_Keyword_17", "Medical_Keyword_18", "Medical_Keyword_19", "Medical_Keyword_20", 
         "Medical_Keyword_21", "Medical_Keyword_22", "Medical_Keyword_23", "Medical_Keyword_24", "Medical_Keyword_25", 
         "Medical_Keyword_26", "Medical_Keyword_27", "Medical_Keyword_28", "Medical_Keyword_29", "Medical_Keyword_30", 
         "Medical_Keyword_31", "Medical_Keyword_32", "Medical_Keyword_33", "Medical_Keyword_34", "Medical_Keyword_35", 
         "Medical_Keyword_36", "Medical_Keyword_37", "Medical_Keyword_38", "Medical_Keyword_39", "Medical_Keyword_40", 
         "Medical_Keyword_41", "Medical_Keyword_42", "Medical_Keyword_43", "Medical_Keyword_44", "Medical_Keyword_45", 
         "Medical_Keyword_46", "Medical_Keyword_47", "Medical_Keyword_48"]

In [15]:
# update schema and overwrite the delta table
from pyspark.sql.types import *

for c in categorical:
    data = data.withColumn(c, data[c].cast(StringType()))

for c in continuous:
    data = data.withColumn(c, data[c].cast(FloatType()))
    
for c in discrete:
    data = data.withColumn(c, data[c].cast(ShortType()))

for c in dummy:
    data = data.withColumn(c, data[c].cast(BooleanType()))

data.write.format("delta").mode("overwrite").save("./delta/SILVER_TABLE")

In [16]:
data = spark.read.format("delta").load("./delta/SILVER_TABLE")
data.select(data.columns[1:10]).show(10)
data.select(data.columns[0:100]).printSchema()

+--------------+--------------+--------------+--------------+--------------+--------------+--------------+----------+----------+
|Product_Info_1|Product_Info_2|Product_Info_3|Product_Info_4|Product_Info_5|Product_Info_6|Product_Info_7|   Ins_Age|        Ht|
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+----------+----------+
|             1|            A7|            26|           0.0|             2|             3|             1| 0.6119403|0.76363635|
|             1|            D4|            26|     0.4871795|             2|             1|             1|0.05970149| 0.8181818|
|             1|            D1|            26|    0.07692308|             2|             3|             1| 0.6567164| 0.7090909|
|             1|            A8|            26|    0.07692308|             2|             3|             1|0.70149255|0.72727275|
|             1|            D3|            10|    0.12820512|             2|             3|      

### Delta Lake Table Versioning

In [17]:
# delta lake metadata keeps tracks of versions of the tables; 
# can be used for backtracking or auditing, if necessary
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "./delta/SILVER_TABLE")
deltaTable.history().select("version", "timestamp", "operation", "operationParameters").show()

+-------+--------------------+---------+--------------------+
|version|           timestamp|operation| operationParameters|
+-------+--------------------+---------+--------------------+
|      0|2022-02-09 18:23:...|    WRITE|{mode -> Overwrit...|
+-------+--------------------+---------+--------------------+



In [18]:
# after overwriting the delta table, we now have an additional version
# stored in metadata
data.write.format("delta").mode("overwrite").save("./delta/SILVER_TABLE")
deltaTable.history().select("version", "timestamp", "operation", "operationParameters").show()

+-------+--------------------+---------+--------------------+
|version|           timestamp|operation| operationParameters|
+-------+--------------------+---------+--------------------+
|      1|2022-02-09 18:24:...|    WRITE|{mode -> Overwrit...|
|      0|2022-02-09 18:23:...|    WRITE|{mode -> Overwrit...|
+-------+--------------------+---------+--------------------+



In [19]:
# load up the first version ("@v0") of the silver delta table
data = spark.read.format("delta").load("./delta/SILVER_TABLE@v0")

## Split Data

In [20]:
trainDF, testDF = data.randomSplit([.80, .20], seed = 100)

In [21]:
print("Train:", trainDF.count(), "\nTest: ", testDF.count())

Train: 47594 
Test:  11787


# Model Pipeline

In [22]:
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [23]:
# convert spark dataframe into pandas_on_spark dataframe
pd_trainDF = trainDF.to_pandas_on_spark()
pd_trainDF.head(10)

Unnamed: 0,Id,Product_Info_1,Product_Info_2,Product_Info_3,Product_Info_4,Product_Info_5,Product_Info_6,Product_Info_7,Ins_Age,Ht,Wt,BMI,Employment_Info_1,Employment_Info_2,Employment_Info_3,Employment_Info_4,Employment_Info_5,Employment_Info_6,InsuredInfo_1,InsuredInfo_2,InsuredInfo_3,InsuredInfo_4,InsuredInfo_5,InsuredInfo_6,InsuredInfo_7,Insurance_History_1,Insurance_History_2,Insurance_History_3,Insurance_History_4,Insurance_History_5,Insurance_History_7,Insurance_History_8,Insurance_History_9,Family_Hist_1,Family_Hist_2,Family_Hist_3,Family_Hist_4,Family_Hist_5,Medical_History_1,Medical_History_2,Medical_History_3,Medical_History_4,Medical_History_5,Medical_History_6,Medical_History_7,Medical_History_8,Medical_History_9,Medical_History_10,Medical_History_11,Medical_History_12,Medical_History_13,Medical_History_14,Medical_History_15,Medical_History_16,Medical_History_17,Medical_History_18,Medical_History_19,Medical_History_20,Medical_History_21,Medical_History_22,Medical_History_23,Medical_History_24,Medical_History_25,Medical_History_26,Medical_History_27,Medical_History_28,Medical_History_29,Medical_History_30,Medical_History_31,Medical_History_32,Medical_History_33,Medical_History_34,Medical_History_35,Medical_History_36,Medical_History_37,Medical_History_38,Medical_History_39,Medical_History_40,Medical_History_41,Medical_Keyword_1,Medical_Keyword_2,Medical_Keyword_3,Medical_Keyword_4,Medical_Keyword_5,Medical_Keyword_6,Medical_Keyword_7,Medical_Keyword_8,Medical_Keyword_9,Medical_Keyword_10,Medical_Keyword_11,Medical_Keyword_12,Medical_Keyword_13,Medical_Keyword_14,Medical_Keyword_15,Medical_Keyword_16,Medical_Keyword_17,Medical_Keyword_18,Medical_Keyword_19,Medical_Keyword_20,Medical_Keyword_21,Medical_Keyword_22,Medical_Keyword_23,Medical_Keyword_24,Medical_Keyword_25,Medical_Keyword_26,Medical_Keyword_27,Medical_Keyword_28,Medical_Keyword_29,Medical_Keyword_30,Medical_Keyword_31,Medical_Keyword_32,Medical_Keyword_33,Medical_Keyword_34,Medical_Keyword_35,Medical_Keyword_36,Medical_Keyword_37,Medical_Keyword_38,Medical_Keyword_39,Medical_Keyword_40,Medical_Keyword_41,Medical_Keyword_42,Medical_Keyword_43,Medical_Keyword_44,Medical_Keyword_45,Medical_Keyword_46,Medical_Keyword_47,Medical_Keyword_48,Response
0,23943,1,A7,26,0.0,2,3,1,0.61194,0.763636,0.539749,0.8072,0.03,9,1,0.0,2,,1,2,3,3,1,1,1,2,1,3,1,0.000166667,1,3,2,3,,0.568627,0.605634,,5.0,112,2,2,1,1,3,1,2,,3,2,3,2,,3,3,1,1,2,1,2,1,,1,3,3,1,3,2,3,,3,1,1,2,2,1,1,3,1,True,False,True,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,True,2
1,23947,1,D4,26,0.487179,2,1,1,0.059701,0.818182,0.257322,0.319076,0.099,14,1,0.0,2,0.05,1,2,2,3,1,1,1,2,1,1,3,,3,2,3,3,0.130435,,0.112676,,4.0,307,2,1,1,3,2,2,2,,3,2,3,3,,1,3,1,1,2,1,2,3,,1,3,3,1,3,2,3,,3,3,1,2,2,1,3,3,1,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,8
2,23949,1,D1,26,0.076923,2,3,1,0.656716,0.709091,0.309623,0.501138,0.057,9,1,0.0,2,,1,2,3,3,1,1,1,1,1,3,1,0.000666667,1,1,2,2,,0.205882,,0.607143,3.0,491,2,1,1,1,2,2,1,240.0,3,2,3,1,240.0,1,3,1,2,2,1,2,1,240.0,1,3,3,1,1,2,3,,3,3,1,2,2,1,3,3,1,False,False,False,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,True,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,True,7
3,23951,1,D3,10,0.128205,2,3,1,0.41791,0.654545,0.330544,0.60493,0.033,9,1,0.0,3,,2,2,8,3,1,2,1,2,1,3,1,0.0002,1,3,2,2,0.521739,,0.450704,,0.0,161,2,2,1,3,2,2,1,,3,2,3,3,,3,3,1,1,2,1,2,1,,1,3,3,1,1,2,3,,3,3,1,2,1,1,3,3,1,False,False,False,False,True,False,False,False,False,False,True,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,6
4,23952,1,D1,10,0.487179,2,1,1,0.432836,0.636364,0.232218,0.439061,0.032,9,1,0.0,3,0.398,1,2,8,3,1,2,1,1,1,3,1,0.001666667,1,1,2,2,0.536232,,0.478873,,12.0,16,2,1,1,3,2,3,1,,3,2,3,3,11.0,1,3,1,2,2,1,2,3,,1,3,3,1,3,2,3,,3,1,1,2,2,1,3,3,1,False,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,6
5,23954,1,D1,26,0.230769,2,3,1,0.552239,0.636364,0.330544,0.629699,0.06,1,3,0.0,2,0.25,1,2,3,3,1,2,3,2,1,3,1,0.000166667,1,3,2,2,0.753623,,,0.598214,10.0,112,2,2,1,3,2,2,2,,3,2,3,3,,1,3,1,1,2,1,2,3,,2,2,3,1,3,2,3,,3,3,1,3,2,1,3,3,1,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,5
6,23956,1,D3,26,0.076923,2,3,1,0.38806,0.709091,0.322176,0.522696,0.025,12,1,,2,0.2,1,2,11,3,1,1,1,2,1,1,3,,3,2,3,3,,0.480392,0.492958,,,162,2,1,1,3,2,2,2,,3,2,3,3,,1,3,1,1,2,1,2,3,,1,3,3,1,3,2,3,,3,3,1,2,2,1,3,3,1,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,6
7,23958,1,A8,26,0.333333,2,3,1,0.731343,0.654545,0.330544,0.60493,0.025,9,1,0.0156,2,0.738629,1,2,3,3,1,2,1,2,1,1,3,,3,2,3,2,,0.372549,,0.4375,10.0,161,3,2,1,3,2,2,2,,3,2,3,3,,3,3,1,1,2,2,2,1,,1,3,3,1,1,2,3,,3,3,1,2,2,1,3,3,3,True,False,False,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,7
8,23959,1,D4,26,0.74359,2,1,1,0.268657,0.727273,0.341004,0.533838,0.145,12,1,0.0,2,0.92,1,2,8,3,1,1,1,2,1,3,1,0.004,1,3,2,3,,,,,,162,2,1,1,3,3,1,2,,3,2,3,2,,1,3,1,1,2,1,2,3,,1,3,3,1,3,2,3,,3,3,1,2,2,1,3,3,1,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,6
9,23962,1,D4,26,1.0,2,3,1,0.19403,0.781818,0.278243,0.379321,0.08,9,1,0.0,2,0.004,1,2,1,3,1,1,1,1,1,3,1,1.51e-07,1,1,2,3,0.246377,,0.197183,,,162,2,2,1,3,2,2,2,,3,2,3,3,,1,3,1,1,2,1,2,3,,1,3,3,1,3,2,3,,3,3,1,2,2,1,3,3,1,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,8


## Identify Columns to be Omitted

In [24]:
# identify columns with >40% missing values
many_missing = pd_trainDF.isna().sum()/len(pd_trainDF) > 0.4
missing = list(pd_trainDF.loc[:,many_missing.values].columns)
missing

['Insurance_History_5',
 'Family_Hist_2',
 'Family_Hist_3',
 'Family_Hist_5',
 'Medical_History_10',
 'Medical_History_15',
 'Medical_History_24',
 'Medical_History_32']

In [25]:
# identify categorical vars with many levels
pd_trainDF[categorical].nunique()  

Product_Info_1           2
Product_Info_2          19
Product_Info_3          31
Product_Info_5           2
Product_Info_6           2
Product_Info_7           3
Employment_Info_2       35
Employment_Info_3        2
Employment_Info_5        2
InsuredInfo_1            3
InsuredInfo_2            2
InsuredInfo_3           11
InsuredInfo_4            2
InsuredInfo_5            2
InsuredInfo_6            2
InsuredInfo_7            2
Insurance_History_1      2
Insurance_History_2      3
Insurance_History_3      3
Insurance_History_4      3
Insurance_History_7      3
Insurance_History_8      3
Insurance_History_9      3
Family_Hist_1            3
Medical_History_2      544
Medical_History_3        3
Medical_History_4        2
Medical_History_5        3
Medical_History_6        3
Medical_History_7        3
Medical_History_8        3
Medical_History_9        3
Medical_History_11       3
Medical_History_12       3
Medical_History_13       3
Medical_History_14       3
Medical_History_16       3
M

In [26]:
# non-predictors
disregard = missing + ['Id', 'Medical_History_2', 'Response']
predictors = [x for x in pd_trainDF.columns if x not in disregard]

## Generate a Hashed Feature Vector of Remaining Columns

In [27]:
# generate starting hashed (sparse) feature vector
from pyspark.ml.feature import FeatureHasher
hasher = FeatureHasher(inputCols = predictors,
                       outputCol = "hashed_features")

## Discard Zero-Variance Features

In [28]:
# get rid of zero-variance features
from pyspark.ml.feature import VarianceThresholdSelector
selector = VarianceThresholdSelector(featuresCol = hasher.getOutputCol(), outputCol = "features")

## Set up Random Forest Model Spec

In [29]:
# set up random forest classifier
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol = 'Response', seed = 200)

## Set up Hyperparameter Tuning Grid

In [30]:
# set up hyperparameter tuning grid
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [2, 4, 6])
            .addGrid(rf.numTrees, [50, 150])
            .build())

## Prepare Evaluation Metric

In [31]:
# use the multiclass accuracy as an evaluation metric
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol = "Response",
                                              predictionCol = "prediction",
                                              metricName = "accuracy")

## Design Cross-Validation Procedure

In [32]:
# set up cross 3-fold validation
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator = rf,
                    evaluator = evaluator,
                    estimatorParamMaps = paramGrid,
                    numFolds = 10,
                    parallelism = 4,
                    seed = 300)

## Construct the Model Pipeline

In [33]:
# construct the model pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[hasher, selector, cv])

## Fit the Model Using the Pipeline

In [None]:
# fit the model using the train data
pipelineModel = pipeline.fit(trainDF)

# Model Evaluation

## Predictions on Testing Set

In [None]:
# make predictions on the testing data
predDF = pipelineModel.transform(testDF)
predDF.select("features", "Response", "prediction").show(10)

## Confusion Matrix

In [None]:
# confusion matrix
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
predictions = predDF.select("Response", "prediction").to_pandas_on_spark()
r = predictions['Response'].to_numpy()
p = predictions['prediction'].to_numpy()
cm = confusion_matrix(r,p)
disp = ConfusionMatrixDisplay(confusion_matrix = cm)
disp.plot()