## Email Click Through Rate Predictor

Dataset Source: https://www.kaggle.com/datasets/sk4467/email-ctr-prediction

##### Import necessary Libraries

In [0]:
import pandas as pd

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator



##### Versions of Libraries, Modules, Frameworks Used in This Project

In [0]:
print("Apache Spark version:", spark.version)

Apache Spark version: 3.3.1


#### Create Functions That Are Used Throughout Project

##### Function to Ingest Dataset

In [0]:
def ingest_dataset(file_location, schema):
    '''
    This function ingests a csv file and fits it with a schema
    '''
    file_type = "csv"
    infer_schema = "false"
    first_row_is_header = "true"
    delimiter = ","
    
    dataset = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .schema(schema) \
        .load(file_location)
    
    return dataset

##### Metrics Evaluation Function

In [0]:
def regression_metrics_eval(labels, \
                            predictions, \
                            metrics_to_include: [str], \
                            model_name: str) -> None:
    '''
    This function returns the metric values of the
    regression metrics passed in for the predictions
    passed into the function.
    '''
    
    print("+------------------------------------------+")
    print("| ", model_name.center(38) , " |")
    print("+------------------------------------------+")
    print("| ", "Metric".rjust(16), "  |  ", "Value".ljust(15), " |")
    print("+------------------------------------------+")
    for x in metrics_to_include:
        gbt_evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName=x)
        reg_metric_value = gbt_evaluator.evaluate(gbt_preds)
        print("| ", gbt_evaluator.getMetricName().rjust(16), "  |  ", str(round(reg_metric_value,6)).ljust(15), " |")
        print("+------------------------------------------+")

#### Prepare Data For Model

##### Ingest Dataset

In [0]:
data_file = "/FileStore/tables/train_data.csv"

orig_schema = StructType([
    StructField("campaign_id", IntegerType(), True),
    StructField("sender", IntegerType(), True),
    StructField("subject_len", IntegerType(), True),
    StructField("body_len", IntegerType(), True),
    StructField("mean_paragraph_len", IntegerType(), True),
    StructField("day_of_week", IntegerType(), True),
    StructField("is_weekend", IntegerType(), True),
    StructField("times_of_day", StringType(), True),
    StructField("category", IntegerType(), True),
    StructField("product", IntegerType(), True),
    StructField("no_of_CTA", IntegerType(), True),
    StructField("mean_CTA_len", IntegerType(), True),
    StructField("is_image", IntegerType(), True),
    StructField("is_personalized", IntegerType(), True),
    StructField("is_quote", IntegerType(), True),
    StructField("is_timer", IntegerType(), True),
    StructField("is_emoticons", IntegerType(), True),
    StructField("is_discount", IntegerType(), True),
    StructField("is_price", IntegerType(), True),
    StructField("is_urgency", IntegerType(), True),
    StructField("target_audience", IntegerType(), True),
    StructField("label", FloatType(), True),
])

df = ingest_dataset(data_file, orig_schema)

display(df)

campaign_id,sender,subject_len,body_len,mean_paragraph_len,day_of_week,is_weekend,times_of_day,category,product,no_of_CTA,mean_CTA_len,is_image,is_personalized,is_quote,is_timer,is_emoticons,is_discount,is_price,is_urgency,target_audience,label
1,3,76,10439,39,5,1,Noon,6,26,3,29,0,0,0,0,0,0,0,0,14,0.10307898
2,3,54,2570,256,5,1,Morning,2,11,0,22,0,0,0,0,0,0,0,0,10,0.7
3,3,59,12801,16,5,1,Noon,2,11,3,23,1,0,1,0,0,0,0,0,16,0.0027689086
4,3,74,11037,30,4,0,Evening,15,9,4,24,0,0,0,0,0,0,0,0,10,0.010867763
5,3,80,10011,27,5,1,Noon,6,26,3,31,0,0,1,0,0,0,0,0,14,0.14282623
6,3,54,2569,256,4,0,Evening,2,11,0,22,0,0,0,0,0,0,0,0,10,0.5
7,3,54,2570,256,4,0,Evening,2,11,0,22,0,0,0,0,0,0,0,0,10,0.45714286
8,3,60,12117,17,4,0,Noon,6,26,4,34,1,0,1,0,0,0,0,0,14,0.16699801
9,3,89,10055,14,4,0,Noon,6,26,3,34,1,0,0,0,0,0,0,0,13,0.029233105
10,3,89,11049,26,3,0,Evening,15,9,4,28,0,0,0,0,0,0,0,0,6,0.0030576666


##### Drop Unnecessary Feature

In [0]:
df = df.drop("campaign_id")

##### Assign Features to Data Type Groups For Pipeline

In [0]:
string_categorical = ["times_of_day"]

numerical = ["no_of_CTA", "mean_CTA_len", "subject_len", "body_len", "mean_paragraph_len"]

categorical = [x for x in df.columns if x not in numerical]
categorical.remove("label")
categorical.remove(string_categorical[0])

##### Build Data Pipeline Stages

In [0]:
stages = []

for x in string_categorical:
    indexer = StringIndexer(inputCol=x, outputCol=x + "_indexed")
    ohe = OneHotEncoder(inputCols=[indexer.getOutputCol()], outputCols=[x + '_class_vec'])
    stages += [indexer, ohe]

for x in categorical:
    cat_ohe = OneHotEncoder(inputCols=[x], outputCols=[x + '_class_vec'])
    stages += [cat_ohe]

assembler_inputs = [c + '_class_vec' for c in string_categorical]\
                    +  [c + '_class_vec' for c in categorical]\
                    + numerical
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages += [assembler]

##### Apply Pipeline to Dataset to Prepare Data for Model

In [0]:
ctr_pipe = Pipeline().setStages(stages)

ctr_data = ctr_pipe.fit(df).transform(df)

#### Build & Train Model

##### Split Dataset into Training & Testing Datasets

In [0]:
train_ds, test_ds = ctr_data.randomSplit(weights=[0.80, 0.20], seed=42)

train_ds = train_ds.persist()
test_ds = test_ds.persist()

print(f"Samples in Training Dataset: {train_ds.count()}")
print(f"Samples in Testing Dataset: {test_ds.count()}")

Samples in Training Dataset: 1557
Samples in Testing Dataset: 331


##### Create GBTRegressor Instance & Fit it to Training Data

In [0]:
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'label', maxIter=100, maxDepth=6)

gbt_model = gbt.fit(train_ds)

#### Generate Predictions & Evaluate Model

In [0]:
gbt_preds = gbt_model.transform(test_ds)

##### Prepare Predictions For Model Evaluation Model

##### Calculate & Display Metrics

In [0]:
metric_types = ["rmse", "mse", "r2", "mae", "var"]

regression_metrics_eval(gbt_preds.select('label'), \
                        gbt_preds.select('prediction'), \
                        metric_types, \
                        "Email Click Through Rate")

+------------------------------------------+
|         Email Click Through Rate         |
+------------------------------------------+
|            Metric   |   Value            |
+------------------------------------------+
|              rmse   |   0.04362          |
+------------------------------------------+
|               mse   |   0.001903         |
+------------------------------------------+
|                r2   |   0.689475         |
+------------------------------------------+
|               mae   |   0.025134         |
+------------------------------------------+
|               var   |   0.00563          |
+------------------------------------------+


##### End Spark Session

In [0]:
train_ds = train_ds.persist()
test_ds = test_ds.persist()

spark.stop()

### Notes & Other Takeaways From This Project
****
- While this model did not prove to have great results for predicting email click through rate, I think that the model would be useful if there were more samples for with to train.
****