## MODEL DATA

Once we have all of our input datasets prepared, we can now pull them together to create the final model training set. 

##### Timing 
We want to time how long these programs take to run. We are interested both in real time and CPU time.

In [None]:
import time 

start_time = time.time()
start_cpu_time = time.process_time()

#### Set Up

In [None]:
import os
import numpy as np
import calendar

import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType, FloatType, StructType, StructField

from pyspark.sql.functions import collect_list, regexp_replace, lower
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import year, month, dayofmonth, to_date, trim, concat, col, lit
from functools import reduce

import datetime 
from datetime import datetime as dt
from dateutil.relativedelta import *
import pandas as pd

#### Load in Prospect Data for Training

In [None]:
pb = spark.read.load("/anaurosevic/cdn0_cards_affinity/prospect_base/").withColumn( #Update to prospect base shortly after :) 
    'FSA', F.substring(F.col('postal_code'),0,3))

In [None]:
pb.printSchema()

In [None]:
pb.show(5,False)

In [None]:
pb.count()

#### Identify Current Session Details 
*In the initial training of the model, we will be pulling session details for the session during which the prospect first applied for a CC. Once the model is live, this will be pulled real-time for each new prospect.*

We want to capture relevant information from the session during which they applied for the CC, such as: 
- __Device details (category, OS, brand, language):__ Device details are proxies for customer characteristics - e.g., younger users who are more likely to purchase a student card are also more likely to be using mobile/Apple device, also certain credit cards offer mobile coverage which may be of priorty 
- __Channel type (what channel brought them here?):__ If they were directed from a marketing campaign, it could tell us why
  
This can be found in the sessions table for the given session during which customers purchased the CC.  

In [None]:
def month_end_list(num_months):

    #Define period start and end based on today's date 
    today = datetime.date.today()
    period_start = today.replace(day=1) + relativedelta(months=-num_months)
    period_end = today.replace(day=1)

    #Create list of month ends
    dtrange = pd.date_range(start=period_start, end=period_end, freq='d')
    months = pd.Series(dtrange.month)
    starts, ends = months.ne(months.shift(1)), months.ne(months.shift(-1))
    df = pd.DataFrame({'month_starting_date': dtrange[starts].strftime('%Y-%m-%d'),
                       'month_ending_date': dtrange[ends].strftime('%Y-%m-%d')})
    
    me_list = list(df['month_ending_date'])[:-1]
    return me_list 

In [None]:
month_end_list(11)

In [None]:
filter_string = "session_date>='"+str(month_end_list(11)[0])+"' and session_date<='"+str(month_end_list(11)[-1])+"'"
print(filter_string)

In [None]:
session = spark.read.option(
    "basePath","...GA4_SESSION...").load(
    "...GA4_SESSION...").filter(filter_string).select(
    "user_pseudo_id","user_session_id",
    "user_session_start_timestamp",
    "user_default_channel_grouping",
    "sess_default_channel_grouping",
    "device_category","device_operating_system","device_mobile_brand_name","device_language").withColumn(
    "sess_timestamp",F.from_unixtime(F.col("user_session_start_timestamp")/1e6)).withColumn(
    "sess_date",F.to_date("sess_timestamp")).drop('user_session_start_timestamp').distinct()

In [None]:
session.printSchema()

In [None]:
session.show(5,False)

In [None]:
#Grab first sess_timestamp for each session ID to simplify 
session_filtered = session.withColumn(
     "rank", F.row_number().over(Window.partitionBy("user_session_id").orderBy("sess_timestamp"))).filter(
    "rank=1").drop("rank")

##### Merge to Identify Current Session Details 

In [None]:
current_session = pb.join(session_filtered, [
    (pb.user_pseudo_id == session_filtered.user_pseudo_id) & 
    (pb.user_session_id == session_filtered.user_session_id) 
], how='left').drop(session_filtered.user_session_id).drop(session_filtered.user_pseudo_id).distinct().persist()

In [None]:
current_session.count()

In [None]:
current_session.filter('device_category is null').count() #Why nulls? 
current_session.filter('device_category is null').select('card_sale_date').distinct().show() #Something must have happened on March 6/7, 2025 
#Let's just fill the missing values 

##### Format Variables

In [None]:
current_session_vars = current_session.withColumn(
    #Device category
    "device_cat",
    F.when(F.col('device_category').isin(['mobile','desktop']), F.col("device_category")).when(
        F.col('device_category').isNull(),"unk").otherwise('other')).withColumn(
    #Device operating system
    "device_os",
    F.when(~F.col("device_operating_system").isin(["iOS","Android","Windows","Macintosh"]), "other").when(
        F.col("device_operating_system").isNull(),"unk").otherwise(lower(F.col("device_operating_system")))).withColumn(
    #Device brand
    "device_brand", F.when(
        F.col('device_mobile_brand_name').isin(['Apple','Samsung','Google','Microsoft','Motorola','Mozilla','LG']),lower(F.col('device_mobile_brand_name'))).when(
        F.col('device_mobile_brand_name').isNull(),"unk").otherwise("oth")).withColumn(
    #Device language
    "device_language2",
    F.trim(F.substring('device_language',1,2))).drop('device_language').withColumn(
    "device_language",
    F.when(((F.col("device_language2")=='en') |
            (F.col("device_language2")=='fr') |
            (F.col("device_language2")=='es') |
            (F.col("device_language2")=='zh') |
            (F.col("device_language2")=='ko') |
            (F.col("device_language2")=='tr') |
            (F.col("device_language2")=='ru') |
            (F.col("device_language2")=='pt')), 
           F.col("device_language2")).when(
        F.col("device_language2").isNull(),"unk").otherwise("oth")).withColumn(
    #Device channel
    "channel",
    F.when(((F.col("sess_default_channel_grouping")=='Paid Search') |
              (F.col("sess_default_channel_grouping")=='Referral') |
              (F.col("sess_default_channel_grouping")=='Direct') |
              (F.col("sess_default_channel_grouping")=='Affiliates') |
              (F.col("sess_default_channel_grouping")=='Paid Social') |
              (F.col("sess_default_channel_grouping")=='Organic Search') |
              (F.col("sess_default_channel_grouping")=='Unassigned') |                                          
              (F.col("sess_default_channel_grouping")=='Performance Max') |
              (F.col("sess_default_channel_grouping")=='Demand Gen')), 
              regexp_replace(lower(F.col("sess_default_channel_grouping")), ' ', '_')).when(
        F.col("sess_default_channel_grouping").isNull(), "unk").otherwise("other")).withColumnRenamed("channel","sess_channel").drop(
    "device_category","device_operating_system","device_mobile_brand_name","device_language2","sess_default_channel_grouping")

In [None]:
current_session_subset = current_session_vars.drop('user_default_channel_grouping','sess_timestamp','sess_date')

In [None]:
current_session_subset.show(5,False)
#So beautiful!! She looks great :) 

##### Create flags

In [None]:
cat_cols = ['province','device_cat','device_os','device_brand','device_language','sess_channel']
current_session_pd = current_session_subset.toPandas()

dummies = pd.get_dummies(current_session_pd[cat_cols], drop_first=False)
dummies.head(5)

In [None]:
current_session_temp = pd.concat([current_session_pd[['user_pseudo_id','user_session_id','clnt_no','product_code','postal_code','card_sale_date']], dummies], axis=1)

In [None]:
print(list(current_session_temp.columns))

In [None]:
current_session_final = spark.createDataFrame(current_session_temp)

In [None]:
current_session_final.count() #Should be the same as the original PB size!

#### Offers

Identify what credit card offers were available and on promotion at the time of credit card application. 

In [None]:
offers = spark.read.csv("/anaurosevic/cdn0_cards_affinity/offers/offer_value.csv", header=True, inferSchema=True)

In [None]:
offers.show(5,False)

In [None]:
#Format values 
offers_formatted = offers.withColumn(
    'offer_start_date',F.to_date(F.col('offer_start'),"d-MMM-yy")).withColumn(
    'offer_end_date',F.to_date(F.col('offer_end'),'d-MMM-yy')).withColumn(
    'value',F.regexp_replace(F.col("offer_value"), "\\$", "").cast("double")).select(
    F.col('offer_cards').alias('offer_card'),'offer_start_date','offer_end_date',F.col('value').alias('offer_value'))

offers_formatted.show(5,False)

In [None]:
#Drop null offer values for now - these should be provided and updated by the business team
offers_formatted = offers_formatted.filter('offer_value is not null')

In [None]:
#Unique offer cards :) 
offer_cards = offers_formatted.select("offer_card").distinct().rdd.flatMap(lambda x: x).collect()
offer_cards

In [None]:
#Create new columns for each offer_card
offers_pivot = offers_formatted

for card in offer_cards:
    offers_pivot = offers_pivot.withColumn(f"offer_{card}", F.when(F.col("offer_card") == card, F.col("offer_value")).otherwise(0))

#Drop the original offer_card and offer_value columns if needed
offers_pivot = offers_pivot.drop("offer_card", "offer_value")

In [None]:
offers_pivot.show(5,False)

In [None]:
#Merge onto session
current_session_offers = current_session_final.select('user_pseudo_id','product_code','card_sale_date').join(
    offers_pivot, 
    on=[offers_pivot.offer_start_date<=current_session_final.card_sale_date, current_session_final.card_sale_date <= offers_pivot.offer_end_date],
    how='left').drop('offer_start_date','offer_end_date')

current_session_offers.count()

In [None]:
col_names = ['user_pseudo_id','product_code','card_sale_date'] + list(offers_pivot.drop('offer_start_date','offer_end_date').columns)

In [None]:
#Sum for each user_pseudo_id
offers_sum = current_session_offers.groupBy('user_pseudo_id','product_code','card_sale_date').sum().toDF(*col_names)

offers_sum.show()

In [None]:
offers_sum.count()

In [None]:
offers_sum.filter('offer_MC4 is null').count()

In [None]:
#Add them back to the main dataet
session_final = current_session_final.join(
    offers_sum.drop('product_code','card_sale_date'), on='user_pseudo_id', how='inner').drop(offers_sum.user_pseudo_id)

In [None]:
session_final.count()

In [None]:
session_final.printSchema()

In [None]:
session_final.select('user_pseudo_id','card_sale_date','product_code','offer_MR2','offer_ION','offer_MC4').show(5,False)

#### Load in Previous Ecommerce Events

*In the initial training of the model, we will be pulling session details for the session during which the prospect first applied for a CC. Once the model is live, this will be pulled real-time for each new prospect.*

##### Combine Files

In [None]:
monthly_folders = month_end_list(11)[:-1]
monthly_folders

In [None]:
#Import 
#Note: Needs to be a bit advanced because the cols in each dataset is not the same :D 

base_path = "/anaurosevic/cdn0_cards_affinity/previous_sessions/events/"
monthly_folders = month_end_list(11)[:-1]

dataframes = []
all_columns = set()

# Grab all folders
for folder in monthly_folders:
    # Read the dataset for the month
    df = spark.read.load(base_path+folder)
    dataframes.append(df)
    
    # Collect column names
    all_columns.update(df.columns)

# Convert set of all columns to a sorted list for consistent ordering
all_columns = sorted(all_columns)

# Align each dataset to the full set of columns
aligned_dataframes = []

for df in dataframes:
    # Find missing columns for the current dataset
    missing_columns = set(all_columns) - set(df.columns)
    
    # Add missing columns with zeroes
    for col in missing_columns:
        df = df.withColumn(col, lit(0))
    
    # Reorder columns to match the full set of columns
    df = df.select(*all_columns)
    
    # Append to the aligned DataFrame list
    aligned_dataframes.append(df)

# Union all aligned DataFrames
final_df = aligned_dataframes[0]

for df in aligned_dataframes[1:]:
    final_df = final_df.union(df)

In [None]:
all_columns = final_df.columns
reordered_columns = ['user_session_id'] + [col for col in all_columns if col != 'user_session_id']

#Apply the new column order
df_reordered = final_df.select(*reordered_columns)

ecommerce_12m = df_reordered.withColumn('user_pseudo_id',F.split(F.col('user_session_id'),'_')[0])

In [None]:
#Summarize at the user_pseudo_id level by sum 
ecommerce_agg = ecommerce_12m.drop('user_session_id').groupBy('user_pseudo_id').sum()

In [None]:
print(ecommerce_agg.columns)

In [None]:
#Rename columns 
orig_cols = list(ecommerce_agg.drop('user_pseudo_id').columns)
rename_cols = [x[4:-1] for x in orig_cols]
rename_cols.insert(0,'user_pseudo_id')
rename_cols

In [None]:
ecommerce_agg_format = ecommerce_agg.toDF(*rename_cols)

In [None]:
ecommerce_agg_format.select('user_pseudo_id').distinct().count()/session_final.count() 
#Makes sense, only about 13% have at least one event in the previous month! 

In [None]:
ecommerce_agg_format.select('user_pseudo_id','checkout_accounts_004','view_credit_cards_CAV','select_credit_cards_VPR','newcomer_view').show(10, False)

In [None]:
#Merge
all_sessions = session_final.join(ecommerce_agg_format, on='user_pseudo_id', how='left').fillna(0) #Fill 0 if missing

In [None]:
all_sessions.count()

#### Load in Lookups

In [None]:
pc_lookup = spark.read.load("/anaurosevic/cdn0_cards_affinity/lookup/postal_code_final/")
fsa_lookup = spark.read.load("/anaurosevic/cdn0_cards_affinity/lookup/fsa_final/")

In [None]:
#### For sending this to dev, let's stack everything! 

#Create NULL 
overall_avg = pc_lookup.agg(*(F.avg(c).alias(c) for c in pc_lookup.drop('postal_code').columns)).withColumn("join_key",F.lit("missing"))
reordered_columns = ["join_key"] + [col for col in overall_avg.columns if col!="join_key"]
null_row = overall_avg.select(*reordered_columns)

stack = pc_lookup.withColumnRenamed('postal_code','join_key').union(
    fsa_lookup.withColumnRenamed('FSA','join_key')).union(
    null_row)

In [None]:
stack.count()==(pc_lookup.count() + fsa_lookup.count() + null_row.count()) #Looks good to me :) 

In [None]:
set(pc_lookup.withColumnRenamed('postal_code','join_key').columns)==set(fsa_lookup.withColumnRenamed('FSA','join_key').columns)== set(null_row.columns)

In [None]:
stack.printSchema()

In [None]:
#Save PC lookup as CSV 
#pc_lookup.write.option("header", True).csv("/anaurosevic/cdn0_cards_affinity/lookup/csv/")

In [None]:
from pyspark.sql.functions import stddev
stddev = fsa_lookup.select(*(F.stddev(col).alias(f"{col}_stddev") for col in fsa_lookup.columns if col != "id"))
stddev_dict = stddev.collect()[0].asDict()

In [None]:
#Calculate mean and standard deviation for each numeric column
#Drop FSA
fsa_no_fsa = fsa_lookup.drop('FSA')

stats = fsa_no_fsa.select(
    *(F.mean(col).alias(f"{col}_mean") for col in fsa_no_fsa.columns),
    *(F.stddev(col).alias(f"{col}_stddev") for col in fsa_no_fsa.columns)
).collect()[0]

#Normalize columns using z-scores
for col_name in fsa_no_fsa.drop('FSA').columns:
    mean_col = stats[f"{col_name}_mean"]
    stddev_col = stats[f"{col_name}_stddev"]
    zscore = fsa_no_fsa.drop('FSA').withColumn(f"{col_name}_zscore", (F.col(col_name) - mean_col) / stddev_col)

zscore_dict = zscore.collect()[0].asDict()

In [None]:
zscore_dict

In [None]:
discrepancies = {key: value for key, value in zscore_dict.items() if abs(value) >=2}

In [None]:
discrepancies

##### Identify Sparse RBC Postal Codes

In order to ensure anonimity and protect RBC client PII, postal codes where we have very few clients - i.e., <5 - will be further aggregated at the FSA level. 

In [None]:
month_end_list(1)

In [None]:
"...CLIENT_PROFILE.../MONTH_END_DATE=" + str(month_end_list(2)[0])

In [None]:
#Sparse postal codes
ucp = spark.read.load("...CLIENT_PROFILE.../MONTH_END_DATE=" + str(month_end_list(2)[0])).select(
    'CLNT_NO','POST_CD').withColumnRenamed(
    'CLNT_NO','clnt_no').withColumnRenamed(
    'POST_CD','postal_code').distinct().filter(
    'postal_code is not null')

In [None]:
ucp.select('postal_code').distinct().count()

In [None]:
sparse = ucp.groupBy('postal_code').agg(
    F.count(F.col('clnt_no')).alias('num_clnts')).filter(
    F.col("num_clnts")<5)

In [None]:
sparse.show(10,False)

In [None]:
sparse.count()

In [None]:
sparse.printSchema()

##### Merge Postal Code Lookup for Non-Sparse

In [None]:
non_sparse_pc = pb.join(sparse, on='postal_code', how='left_anti').join(
    pc_lookup, on='postal_code', how='left')

In [None]:
non_sparse_pc.filter('prizm_seg_01 is null').count()

In [None]:
non_sparse_pc.count()

In [None]:
len(non_sparse_pc.columns)

##### Merge FSA Lookup for Sparse

In [None]:
sparse_pc = pb.join(sparse, on='postal_code', how='inner').join(
    fsa_lookup, on='fsa', how='left').drop("num_clnts")

In [None]:
sparse_pc.count()

In [None]:
sparse_pc.filter('prizm_seg_01 is null').count() #Great, there were matches for all people 

In [None]:
len(sparse_pc.columns)

In [None]:
#list(set(sparse_pc.columns) - set(non_sparse_pc.columns))

#### Stack and Fill Nulls

In [None]:
#Reorder columns for easy stacking!
col_order = non_sparse_pc.columns
sparse_pc_ordered = sparse_pc.select(col_order)

In [None]:
complete_pb = non_sparse_pc.union(sparse_pc_ordered)

In [None]:
(complete_pb.count() == pb.count()) #Yay

In [None]:
complete_pb.filter('age is null').count()/pb.count() #Still lots of missing unfortunately :(

In [None]:
#Nulls will be the overall average of the entire postal code dataset ~
#Fill nulls as zeros within the postal code dataset 

overall_dict = pc_lookup.agg(*(F.avg(c).alias(c) for c in pc_lookup.drop('postal_code').columns)).collect()[0].asDict()
print(overall_dict)

In [None]:
#Convert to float 
overall_dict = {k:float(v) for k, v in overall_dict.items()}

In [None]:
final_pc_model_data = complete_pb.na.fill(overall_dict)

In [None]:
final_pc_model_data.filter('age is null').count()

In [None]:
final_pc_model_data.count()

#### Merge Everything for Final Dataset

At this point, let's make sure everything is de-identified! We don't really need any identifiers other than user_pseudo_id and product code. 

In [None]:
all_sessions.printSchema()

In [None]:
final_pc_model_data.printSchema()

In [None]:
model_data = all_sessions.drop('user_session_id','clnt_no','card_sale_date').join(
    final_pc_model_data.drop('postal_code','user_session_id','clnt_no','card_sale_date','session_timestamp','province','FSA','product_code'),
    on='user_pseudo_id', how='left').drop(final_pc_model_data.product_code)

In [None]:
model_data.count()

##### Final Check

In [None]:
#Check for nulls in each column
null_counts = model_data.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in model_data.columns])
null_summary = null_counts.agg(*[F.max(c).alias(c) for c in null_counts.drop('user_pseudo_id').columns]).collect()[0].asDict()

In [None]:
model_data.filter('postal_code is null').show(5,False) 

In [None]:
null_dict = {key: value for key, value in null_summary.items() if value != 0}
print(null_dict) #woo

##### Save

In [None]:
model_data.coalesce(1).write.mode("overwrite").parquet("/anaurosevic/cdn0_cards_affinity/model_data/")

--- END PROGRAM ---

In [None]:
#Timing summary
end_time = time.time()
end_cpu_time = time.process_time()

real_time_elapsed = end_time - start_time
cpu_time_elapsed = end_cpu_time - start_cpu_time

print(f"Real time: {real_time_elapsed:.2f} seconds")
print(f"CPU time: {cpu_time_elapsed:.2f} seconds")