# Green Dot - Transaction Feature Generation Script

In [0]:
## 1 Month Observation Window Script

In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd

In [0]:
pd.options.display.max_rows = None 
pd.options.display.max_columns = None 

In [0]:
spark

## Load
Uncomment the correct entry in each cell for the dataset you are generating

In [0]:
#raw_df = spark.read.parquet('gs://ds-greendot/jc/gd_chargeback/raw_training_nec_transactions_20210830/')     # TRAINING
raw_df = spark.read.parquet('gs://ds-greendot/jc/gd_chargeback/raw_test_nec_transactions_20210830/')        # TEST
#raw_df = spark.read.parquet('gs://ds-greendot/jc/gd_chargeback/raw_validation_nec_transactions_20210830/')    # VALIDATION

In [0]:
raw_df.count()

In [0]:
raw_df.select('ssn_supertoken').distinct().count()

In [0]:
raw_df.columns

In [0]:
prep_df = (
         raw_df
        .filter(F.length(F.col('ssn_supertoken')) == 12)
        .withColumn('days_from_creation', F.datediff(F.col('authdate'), F.col('first_load_date')))
        .filter(F.expr("days_from_creation <= 270"))
        .withColumn('period_obs', F.when(F.col('days_from_creation') <= 90, 1).otherwise(0))
        .withColumn('period_tgt', F.when(F.col('days_from_creation') > 90, 1).otherwise(0))
        .withColumn('dollar_band_5',F.when(F.expr("transamt <= 4.99"), 1).otherwise(0))
        .withColumn('dollar_band_10',F.when(F.expr("transamt > 4.99 AND transamt<= 9.99"), 1).otherwise(0))
        .withColumn('dollar_band_15',F.when(F.expr("transamt > 9.99 AND transamt<= 14.99"), 1).otherwise(0))
        .withColumn('dollar_band_25',F.when(F.expr("transamt > 14.99 AND transamt<= 24.99"), 1).otherwise(0))
        .withColumn('dollar_band_50',F.when(F.expr("transamt > 24.99 AND transamt<= 49.99"), 1).otherwise(0))
        .withColumn('dollar_band_100',F.when(F.expr("transamt > 49.99 AND transamt<= 99.99"), 1).otherwise(0))
        .withColumn('dollar_band_500',F.when(F.expr("transamt > 99.99 AND transamt<= 499.99"), 1).otherwise(0))
        .withColumn('dollar_band_1000',F.when(F.expr("transamt > 499.99 AND transamt<= 999.99"), 1).otherwise(0))
        .withColumn('dollar_band_over_1000',F.when(F.expr("transamt > 999.99"), 1).otherwise(0))
        .withColumn('trans_month',F.month('authdate'))
        .withColumn('trans_day',F.dayofyear('authdate'))
        .withColumn('trans_week',F.weekofyear('authdate'))
        .withColumn('trans_hour',F.hour('authdate'))
        .withColumn("is_weekend", F.dayofweek("authdate").isin([1,7]).cast("int"))
        .withColumn("is_morning", F.hour("authdate").between(6,12).cast("int"))
        .withColumn("is_afternoon", F.hour("authdate").between(12,18).cast("int"))
        .withColumn("is_night", F.hour("authdate").between(18,0).cast("int"))
        .withColumn("is_midnight", F.hour("authdate").between(0,6).cast("int"))
        
                   )

In [0]:
# Set credit/debit variables based on above
debitcredit_CREDIT = 'C'
debitcredit_DEBIT = 'D'

In [0]:
#Create transaction dataset 
trans_df = prep_df.filter(F.col('trans_time_period')=='observation')

In [0]:
#Case When Function
def case_when(col,reason):
    
    return F.when(F.col(col) == reason, reason).otherwise(F.lit(None))

In [0]:
trans_df.columns

## Transaction Features
All below are taken using the trans_df, are from the OBSERVATION PERIOD, and are per ssntoken 
- `num_credit_trans`: number of distinct txns marked as credits
- `num_debit_trans`: number of distinct txns not marked as credits
- `total_credit_trans_amount`: total dollar amount of all credit transactions
- `total_debit_trans_amount`: total dollar amount of all non-credit transactions
- `min_trans_debit_amount`: the minimum transaction marked as debit
- `max_trans_debit_amount`: the max transaction marked as debit
- `mean_trans_debit_amount`: the mean of all transaction marked as debit
- `stddev_trans_debit_amount`: the standard deviation of all transactions marked as debit
- `min_trans_credit_amount`: the minimum transaction marked as credit
- `max_trans_credit_amount`: the max transaction marked as credit
- `mean_trans_credit_amount`: the mean of all transaction marked as credit
- `stddev_trans_credit_amount`: the standard deviation of all transactions marked as credit
- `dol_band_trans_cred_15_count`: a count of transactions between 0-15 marked as credit
- `dol_band_trans_cred_25_count`: a count of transactions between 15-25 marked as credit
- `dol_band_trans_cred_50_count`: a count of transactions between 25-50 marked as credit
- `dol_band_trans_cred_100_count`: a count of transactions between 50-100 marked as credit
- `dol_band_trans_cred_500_count`: a count of transactions between 100-500 marked as credit
- `dol_band_trans_cred_1000_count`: a count of transactions between 500-1000 marked as credit
- `dol_band_trans_cred_1000_count`: a count of transactions over 1000 marked as credit
- `dol_band_trans_deb_15_count`: a count of transactions between 0-15 marked as debit
- `dol_band_trans_deb_25_count`: a count of transactions between 15-25 marked as debit
- `dol_band_trans_deb_50_count`: a count of transactions between 25-50 marked as debit
- `dol_band_trans_deb_100_count`: a count of transactions between 50-100 marked as debit
- `dol_band_trans_deb_500_count`: a count of transactions between 100-500 marked as debit
- `dol_band_trans_deb_1000_count`: a count of transactions between 500-1000 marked as debit
- `dol_band_trans_deb_1000_count`: a count of transactions over 1000 marked as debit
- `weekend_trans_count_cred`: a count of transactions that occured on Saturday or Sunday marked as credit
- `weekend_trans_count_debit`: a count of transactions that occured on Saturday or Sunday marked as debit
- `morning_trans_count_cred`: a count of transactions that occured between 6am-12pm marked as credit
- `afternoon_trans_count_cred`: a count of transactions that occured between 12pm-6pm marked as credit
- `night_trans_count_cred`: a count of transactions that occured between 6pm-12am marked as credit
- `midnight_trans_count_deb`: a count of transactions that occured between 12am-6am marked as credit
- `morning_trans_count_deb`: a count of transactions that occured between 6am-12pm marked as debit
- `afternoon_trans_count_deb`: a count of transactions that occured between 12pm-6pm marked as debit
- `night_trans_count_deb`: a count of transactions that occured between 6pm-12am marked as debit
- `midnight_trans_count_deb`: a count of transactions that occured between 12am-6am marked as debit
- `unique_mcc_count_trans`: a count of unique values for mcc code
- `most_frequent_mcc_trans`: the most frequent occuring value for mcc code
- `unique_merch_state_count_trans`: a count of unique values for mcc code
- `most_frequent_merch_state_trans`: the most frequent occuring value for mcc code

In [0]:
credits_per_ssn = (
    trans_df
        .filter((F.col('creditdebit') == debitcredit_CREDIT))
        .groupby('ssn_supertoken')
        .agg(F.countDistinct(F.col('transdtlkey')).cast("int").alias('num_credit_trans'))
)

In [0]:
credits_per_ssn.show(truncate=False)

In [0]:
debits_per_ssn = (
    trans_df
        .filter((F.col('creditdebit') == debitcredit_DEBIT))
        .groupby('ssn_supertoken')
        .agg(F.countDistinct(F.col('transdtlkey')).cast("int").alias('num_debit_trans'))
)

In [0]:
credit_txn_amounts_per_ssn = (
    trans_df
        .filter((F.col('creditdebit') == debitcredit_CREDIT))
        .groupby('ssn_supertoken')
        .agg(F.sum(F.col('transamt')).cast("float").alias('total_credit_trans_amount'))
)

In [0]:
debit_txn_amounts_per_ssn = (
    trans_df
        .filter((F.col('creditdebit') == debitcredit_DEBIT))
        .groupby('ssn_supertoken')
        .agg(F.sum(F.col('transamt')).cast("float").alias('total_debit_trans_amount'))
)

In [0]:
#Trans amount summary stats split by credit, debit

trans_d_summary_df = (
        trans_df
            .filter((F.col('creditdebit') == debitcredit_DEBIT))
            .groupBy('ssn_supertoken')
            .agg(
                F.min(F.col("transamt")).cast("float").alias("min_trans_debit_amount"),
                F.max(F.col("transamt")).cast("float").alias("max_trans_debit_amount"),
                F.mean(F.col("transamt")).cast("float").alias("mean_trans_debit_amount"),
                F.stddev(F.col("transamt")).cast("float").alias("stddev_trans_debit_amount")
            ))

trans_c_summary_df = (
        trans_df
            .filter((F.col('creditdebit') == debitcredit_CREDIT))
            .groupBy('ssn_supertoken')
            .agg(
                F.min(F.col("transamt")).cast("float").alias("min_trans_cred_amount"),
                F.max(F.col("transamt")).cast("float").alias("max_trans_cred_amount"),
                F.mean(F.col("transamt")).cast("float").alias("mean_trans_cred_amount"),
                F.stddev(F.col("transamt")).cast("float").alias("stddev_trans_cred_amount")
            ))

In [0]:
#Dollar Bands Transactions Credit Counts

df_dollar_bands_trans_cred = (
    trans_df.filter((F.col('creditdebit') == debitcredit_CREDIT))
    .withColumn('dollar_band_trans_cred_15', F.when(F.expr("dollar_band_15 = 1 OR dollar_band_10 = 1 OR dollar_band_5 = 1"),1).otherwise(0)) 
    .withColumn('dollar_band_trans_cred_25', F.when(F.expr("dollar_band_25 = 1"),1).otherwise(0)) 
    .withColumn('dollar_band_trans_cred_50', F.when(F.expr("dollar_band_50 = 1"),1).otherwise(0)) 
    .withColumn('dollar_band_trans_cred_100', F.when(F.expr("dollar_band_100 = 1"),1).otherwise(0)) 
    .withColumn('dollar_band_trans_cred_500', F.when(F.expr("dollar_band_500 = 1"),1).otherwise(0)) 
    .withColumn('dollar_band_trans_cred_1000', F.when(F.expr("dollar_band_1000 = 1"),1).otherwise(0)) 
    .withColumn('dollar_band_trans_cred_over_1000', F.when(F.expr("dollar_band_over_1000 = 1"),1).otherwise(0)) 
    .groupby('ssn_supertoken')
    .agg(
        F.sum('dollar_band_trans_cred_15').cast("int").alias('dol_band_trans_cred_15_count'),
        F.sum('dollar_band_trans_cred_25').cast("int").alias('dol_band_trans_cred_25_count'),
        F.sum('dollar_band_trans_cred_50').cast("int").alias('dol_band_trans_cred_50_count'),
        F.sum('dollar_band_trans_cred_100').cast("int").alias('dol_band_trans_cred_100_count'),
        F.sum('dollar_band_trans_cred_500').cast("int").alias('dol_band_trans_cred_500_count'),
        F.sum('dollar_band_trans_cred_1000').cast("int").alias('dol_band_trans_cred_1000_count'),
        F.sum('dollar_band_trans_cred_over_1000').cast("int").alias('dol_band_trans_cred_over_1000_count')
    )
)

In [0]:
#Dollar Bands Transactions Debit Counts

df_dollar_bands_trans_deb = (
    trans_df
    .filter((F.col('creditdebit') == debitcredit_DEBIT))
    .withColumn('dollar_band_trans_deb_15', F.when(F.expr("dollar_band_15 = 1 OR dollar_band_10 = 1 OR dollar_band_5 = 1"),1).otherwise(0)) 
    .withColumn('dollar_band_trans_deb_25', F.when(F.expr("dollar_band_25 = 1"),1).otherwise(0)) 
    .withColumn('dollar_band_trans_deb_50', F.when(F.expr("dollar_band_50 = 1"),1).otherwise(0)) 
    .withColumn('dollar_band_trans_deb_100', F.when(F.expr("dollar_band_100 = 1"),1).otherwise(0)) 
    .withColumn('dollar_band_trans_deb_500', F.when(F.expr("dollar_band_500 = 1"),1).otherwise(0)) 
    .withColumn('dollar_band_trans_deb_1000', F.when(F.expr("dollar_band_1000 = 1"),1).otherwise(0)) 
    .withColumn('dollar_band_trans_deb_over_1000', F.when(F.expr("dollar_band_over_1000 = 1"),1).otherwise(0)) 
    .groupby('ssn_supertoken')
    .agg(
        F.sum('dollar_band_trans_deb_15').cast("int").alias('dol_band_trans_deb_15_count'),
        F.sum('dollar_band_trans_deb_25').cast("int").alias('dol_band_trans_deb_25_count'),
        F.sum('dollar_band_trans_deb_50').cast("int").alias('dol_band_trans_deb_50_count'),
        F.sum('dollar_band_trans_deb_100').cast("int").alias('dol_band_trans_deb_100_count'),
        F.sum('dollar_band_trans_deb_500').cast("int").alias('dol_band_trans_deb_500_count'),
        F.sum('dollar_band_trans_deb_1000').cast("int").alias('dol_band_trans_deb_1000_count'),
        F.sum('dollar_band_trans_deb_over_1000').cast("int").alias('dol_band_trans_deb_over_1000_count')
    )
)

In [0]:
#Weekend and time of day transaction counts split by debit and credit
df_trans_weekend_cred = (
    trans_df.filter((F.col('creditdebit') == debitcredit_CREDIT))
    .groupby('ssn_supertoken')
    .agg(F.sum('is_weekend').cast("int").alias('weekend_trans_count_cred')))

df_trans_weekend_deb = (
    trans_df.filter((F.col('creditdebit') == debitcredit_DEBIT))
    .groupby('ssn_supertoken')
    .agg(F.sum('is_weekend').cast("int").alias('weekend_trans_count_deb')))

df_trans_time_of_day_cred = (
    trans_df.filter((F.col('creditdebit') == debitcredit_CREDIT))
    .groupby('ssn_supertoken')
    .agg(F.sum('is_morning').cast("int").alias('morning_trans_count_cred'),
         F.sum('is_afternoon').cast("int").alias('afternoon_trans_count_cred'),
         F.sum('is_night').cast("int").alias('night_trans_count_cred'),
         F.sum('is_midnight').cast("int").alias('midnight_trans_count_cred')))

df_trans_time_of_day_deb = (
    trans_df.filter((F.col('creditdebit') == debitcredit_DEBIT))
    .groupby('ssn_supertoken')
    .agg(F.sum('is_morning').cast("int").alias('morning_trans_count_deb'),
         F.sum('is_afternoon').cast("int").alias('afternoon_trans_count_deb'),
         F.sum('is_night').cast("int").alias('night_trans_count_deb'),
         F.sum('is_midnight').cast("int").alias('midnight_trans_count_deb')))


In [0]:
#Total transaction count
df_trans_count = (
     trans_df
     .groupBy('ssn_supertoken')
     .agg(F.count(F.col("transamt")).cast("int").alias("total_trans_count")\
     )
 )

In [0]:
#Unique merchant category count and most frequent merchant category
df_merch_2_trans = (
    trans_df
    .groupBy('ssn_supertoken')
    .agg(F.countDistinct(F.col("mcc_category")).cast("int").alias("unique_mcc_count_trans")
))
ssn_mcc_window = F.row_number().over(Window.partitionBy("ssn_supertoken").orderBy(F.col("mcc_count").desc()))
most_freq_mcc_trans_df = (
    trans_df
    .groupBy("ssn_supertoken", "mcc_category")
    .agg(
        F.count(F.col("mcc_category")).cast("int").alias("mcc_count")
    )
    .withColumn("rn", ssn_mcc_window)
    .filter("rn = 1")
    .select(
        F.col("ssn_supertoken"), F.col("mcc_category").alias("most_frequent_mcc_trans")
    )
)
joined_mcc_trans_df = df_merch_2_trans.join(most_freq_mcc_trans_df, 'ssn_supertoken', 'inner')

In [0]:
#Unique count and most freq merch city
df_merch_city_trans = (
     trans_df
     .groupBy('ssn_supertoken')
     .agg(F.countDistinct(F.col("merch_city")).cast("int").alias("unique_merch_city_count_trans")
         ))
ssn_merch_window = F.row_number().over(Window.partitionBy("ssn_supertoken").orderBy(F.col("merch_city_count").desc()))
most_freq_city_trans_df = (
    trans_df
    .groupBy("ssn_supertoken", "merch_city")
    .agg(
        F.count(F.col("merch_city")).cast("int").alias("merch_city_count")
    )
    .withColumn("rn", ssn_merch_window)
    .filter("rn = 1")
    .select(
        F.col("ssn_supertoken"), F.col("merch_city").alias("most_frequent_merch_city_trans")
    )
)
joined_merch_city_trans_df = df_merch_city_trans.join(most_freq_city_trans_df, 'ssn_supertoken', 'inner')


In [0]:
#Unique count and most freq merch state
df_merch_state_trans = (
     trans_df
     .groupBy('ssn_supertoken')
     .agg(F.countDistinct(F.col("merch_state")).cast("int").alias("unique_merch_state_count_trans")))

ssn_merch_window = F.row_number().over(Window.partitionBy("ssn_supertoken").orderBy(F.col("merch_state_count").desc()))
freq_merch_state_trans_df = (
    trans_df
    .groupBy("ssn_supertoken", "merch_state")
    .agg(
        F.count(F.col("merch_state")).cast("int").alias("merch_state_count")
    )
    .withColumn("rn", ssn_merch_window)
    .filter("rn = 1")
    .select(
        F.col("ssn_supertoken"), F.col("merch_state").alias("most_frequent_merch_state_trans")
    )
)
joined_merch_state_trans_df = df_merch_state_trans.join(freq_merch_state_trans_df, 'ssn_supertoken', 'inner')

In [0]:
#Unique count and most freq merch zip
df_merch_trans_zip = (
    trans_df
    .groupBy('ssn_supertoken')
    .agg(F.countDistinct(F.col("merch_zip")).cast("int").alias("unique_merch_zip_count_trans")))

ssn_merch_window = F.row_number().over(Window.partitionBy("ssn_supertoken").orderBy(F.col("merch_zip_count").desc()))
freq_merch_zip_trans_df = (
    trans_df
    .groupBy("ssn_supertoken", "merch_zip")
    .agg(
        F.count(F.col("merch_zip")).cast("int").alias("merch_zip_count")
    )
    .withColumn("rn", ssn_merch_window)
    .filter("rn = 1")
    .select(
        F.col("ssn_supertoken"), F.col("merch_zip").alias("most_frequent_merch_zip_trans")
    )
)
joined_merch_zip_trans_df = df_merch_trans_zip.join(freq_merch_zip_trans_df, 'ssn_supertoken', 'inner')

In [0]:
#Monthly transamt statistics for the first 3 months of the observation window
#trans_to_dispute_df = trans_df.withColumn('trans_to_cust_create', F.datediff(F.col('authdate'),(F.col('first_load_date'))))


df_by_10_days = (
    trans_df
    .withColumn('first_10', F.when(F.expr("days_from_creation <=10"),F.col('transamt')).otherwise(None)) 
    .withColumn('second_10', F.when(F.expr("days_from_creation >10 AND days_from_creation<21"),F.col('transamt')).otherwise(None))
    .withColumn('third_10', F.when(F.expr("days_from_creation >=21"),F.col('transamt')).otherwise(None)
               ))


In [0]:
df_by_10_days.filter((F.col('ssn_supertoken')=='GW9JOV2ROSPY')&(F.col('mcc_category')=='Not Available')&(F.col('creditdebit') == debitcredit_DEBIT)).where(F.col('second_10').isNotNull()).count()



In [0]:
mcc_cat_count_debit_2.filter(F.col('ssn_supertoken')=='GW9JOV2ROSPY').show(truncate=False)

In [0]:
#Transaction credit/debit stats by month
monthly_stats_credit_df = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_CREDIT))
    .groupby('ssn_supertoken')
    .agg(
        F.mean('first_10').cast("float").alias('first_10_credit_trans_mean'),
        F.mean('second_10').cast("float").alias('second_10_credit_trans_mean'),
        F.mean('third_10').cast("float").alias('third_10_credit_trans_mean'),
        F.sum('first_10').cast("float").alias('first_10_credit_trans_sum'),
        F.sum('second_10').cast("float").alias('second_10_credit_trans_sum'),
        F.sum('third_10').cast("float").alias('third_10_credit_trans_sum')
    )
)

monthly_stats_debit_df = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_DEBIT))
    .groupby('ssn_supertoken')
    .agg(
        F.mean('first_10').cast("float").alias('first_10_debit_trans_mean'),
        F.mean('second_10').cast("float").alias('second_10_debit_trans_mean'),
        F.mean('third_10').cast("float").alias('third_10_debit_trans_mean'),
        F.sum('first_10').cast("float").alias('first_10_debit_trans_sum'),
        F.sum('second_10').cast("float").alias('second_10_debit_trans_sum'),
        F.sum('third_10').cast("float").alias('third_10_debit_trans_sum')
    )
)

In [0]:
#Merchant Category debit trans sum in Month 1 of the observation window

mcc_cat_transamt_sum_debit_1 = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_DEBIT) & (F.col('first_10').isNotNull()))
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.sum('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_sum_trans_debit_first_10'),
        F.sum('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_sum_trans_debit_first_10'),
        F.sum('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_sum_trans_debit_first_10'),
        F.sum('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_sum_trans_debit_first_10'),
        F.sum('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_sum_trans_debit_first_10'),
        F.sum('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_sum_trans_debit_first_10'),
        F.sum('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_sum_trans_debit_first_10'),
        F.sum('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_sum_trans_debit_first_10'),
        F.sum('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_sum_trans_debit_first_10'),
        F.sum('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_sum_trans_debit_first_10')
    )
)

In [0]:
#Merchant Category debit trans count in Month 1 of the observation window

mcc_cat_count_debit_1 = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_DEBIT) & (F.col('first_10').isNotNull()))
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.count('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_count_trans_debit_first_10'),
        F.count('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_count_trans_debit_first_10'),
        F.count('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_count_trans_debit_first_10'),
        F.count('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_count_trans_debit_first_10'),
        F.count('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_count_trans_debit_first_10'),
        F.count('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_count_trans_debit_first_10'),
        F.count('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_count_trans_debit_first_10'),
        F.count('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_count_trans_debit_first_10'),
        F.count('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_count_trans_debit_first_10'),
        F.count('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_count_trans_debit_first_10')
    )
)

In [0]:
#Merchant Category debit trans sum in Month 2 of the observation window

mcc_cat_transamt_sum_debit_2 = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_DEBIT) & (F.col('second_10').isNotNull()))
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.sum('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_sum_trans_debit_second_10'),
        F.sum('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_sum_trans_debit_second_10'),
        F.sum('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_sum_trans_debit_second_10'),
        F.sum('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_sum_trans_debit_second_10'),
        F.sum('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_sum_trans_debit_second_10'),
        F.sum('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_sum_trans_debit_second_10'),
        F.sum('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_sum_trans_debit_second_10'),
        F.sum('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_sum_trans_debit_second_10'),
        F.sum('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_sum_trans_debit_second_10'),
        F.sum('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_sum_trans_debit_second_10')
    )
)

In [0]:
#Merchant Category debit trans count in Month 2 of the observation window

mcc_cat_count_debit_2 = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_DEBIT) & (F.col('second_10').isNotNull()))
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.count('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_count_trans_debit_second_10'),
        F.count('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_count_trans_debit_second_10'),
        F.count('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_count_trans_debit_second_10'),
        F.count('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_count_trans_debit_second_10'),
        F.count('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_count_trans_debit_second_10'),
        F.count('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_count_trans_debit_second_10'),
        F.count('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_count_trans_debit_second_10'),
        F.count('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_count_trans_debit_second_10'),
        F.count('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_count_trans_debit_second_10'),
        F.count('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_count_trans_debit_second_10')
    )
)

In [0]:
#Merchant Category debit trans sum in Month 3 of the observation window

mcc_cat_transamt_sum_debit_3 = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_DEBIT) & (F.col('third_10').isNotNull()))
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.sum('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_sum_trans_debit_third_10'),
        F.sum('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_sum_trans_debit_third_10'),
        F.sum('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_sum_trans_debit_third_10'),
        F.sum('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_sum_trans_debit_third_10'),
        F.sum('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_sum_trans_debit_third_10'),
        F.sum('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_sum_trans_debit_third_10'),
        F.sum('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_sum_trans_debit_third_10'),
        F.sum('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_sum_trans_debit_third_10'),
        F.sum('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_sum_trans_debit_third_10'),
        F.sum('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_sum_trans_debit_third_10')
    )
)

In [0]:
#Merchant Category debit trans count in Month 3 of the observation window

mcc_cat_count_debit_3 = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_DEBIT) & (F.col('third_10').isNotNull()))
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.count('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_count_trans_debit_third_10'),
        F.count('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_count_trans_debit_third_10'),
        F.count('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_count_trans_debit_third_10'),
        F.count('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_count_trans_debit_third_10'),
        F.count('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_count_trans_debit_third_10'),
        F.count('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_count_trans_debit_third_10'),
        F.count('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_count_trans_debit_third_10'),
        F.count('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_count_trans_debit_third_10'),
        F.count('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_count_trans_debit_third_10'),
        F.count('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_count_trans_debit_third_10')
    )
)

In [0]:
#Merchant Category credit trans sum in Month 1 of the observation window
mcc_cat_transamt_sum_credit_1 = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_CREDIT) & (F.col('first_10').isNotNull()))
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.sum('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_sum_trans_credit_first_10'),
        F.sum('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_sum_trans_credit_first_10'),
        F.sum('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_sum_trans_credit_first_10'),
        F.sum('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_sum_trans_credit_first_10'),
        F.sum('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_sum_trans_credit_first_10'),
        F.sum('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_sum_trans_credit_first_10'),
        F.sum('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_sum_trans_credit_first_10'),
        F.sum('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_sum_trans_credit_first_10'),
        F.sum('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_sum_trans_credit_first_10'),
        F.sum('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_sum_trans_credit_first_10')
    )
)

In [0]:
#Merchant Category credit trans count in Month 1 of the observation window

mcc_cat_count_credit_1 = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_CREDIT) & (F.col('first_10').isNotNull()))
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.count('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_count_trans_credit_first_10'),
        F.count('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_count_trans_credit_first_10'),
        F.count('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_count_trans_credit_first_10'),
        F.count('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_count_trans_credit_first_10'),
        F.count('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_count_trans_credit_first_10'),
        F.count('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_count_trans_credit_first_10'),
        F.count('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_count_trans_credit_first_10'),
        F.count('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_count_trans_credit_first_10'),
        F.count('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_count_trans_credit_first_10'),
        F.count('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_count_trans_credit_first_10')
    )
)

In [0]:
#Merchant Category credit trans sum in Month 2 of the observation window

mcc_cat_transamt_sum_credit_2 = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_CREDIT) & (F.col('second_10').isNotNull()))
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.sum('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_sum_trans_credit_second_10'),
        F.sum('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_sum_trans_credit_second_10'),
        F.sum('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_sum_trans_credit_second_10'),
        F.sum('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_sum_trans_credit_second_10'),
        F.sum('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_sum_trans_credit_second_10'),
        F.sum('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_sum_trans_credit_second_10'),
        F.sum('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_sum_trans_credit_second_10'),
        F.sum('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_sum_trans_credit_second_10'),
        F.sum('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_sum_trans_credit_second_10'),
        F.sum('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_sum_trans_credit_second_10')
    )
)

In [0]:
#Merchant Category credit trans count in Month 2 of the observation window

mcc_cat_count_credit_2 = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_CREDIT) & (F.col('second_10').isNotNull()))
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.count('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_count_trans_credit_second_10'),
        F.count('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_count_trans_credit_second_10'),
        F.count('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_count_trans_credit_second_10'),
        F.count('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_count_trans_credit_second_10'),
        F.count('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_count_trans_credit_second_10'),
        F.count('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_count_trans_credit_second_10'),
        F.count('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_count_trans_credit_second_10'),
        F.count('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_count_trans_credit_second_10'),
        F.count('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_count_trans_credit_second_10'),
        F.count('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_count_trans_credit_second_10')
    )
)

In [0]:
mcc_cat_count_credit_2.filter(F.col('ssn_supertoken')=='GW9JOV2ROSPY').show(truncate=False)

In [0]:
mcc_cat_count_credit_2.show(truncate=False)

In [0]:
#Merchant Category credit trans sum in Month 3 of the observation window

mcc_cat_transamt_sum_credit_3 = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_CREDIT) & (F.col('third_10').isNotNull()))
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.sum('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_sum_trans_credit_third_10'),
        F.sum('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_sum_trans_credit_third_10'),
        F.sum('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_sum_trans_credit_third_10'),
        F.sum('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_sum_trans_credit_third_10'),
        F.sum('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_sum_trans_credit_third_10'),
        F.sum('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_sum_trans_credit_third_10'),
        F.sum('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_sum_trans_credit_third_10'),
        F.sum('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_sum_trans_credit_third_10'),
        F.sum('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_sum_trans_credit_third_10'),
        F.sum('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_sum_trans_credit_third_10')
    )
)

In [0]:
#Merchant Category credit trans count in Month 3 of the observation window

mcc_cat_count_credit_3 = (
    df_by_10_days.filter((F.col('creditdebit') == debitcredit_CREDIT) & (F.col('third_10').isNotNull()))
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.count('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_count_trans_credit_third_10'),
        F.count('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_count_trans_credit_third_10'),
        F.count('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_count_trans_credit_third_10'),
        F.count('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_count_trans_credit_third_10'),
        F.count('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_count_trans_credit_third_10'),
        F.count('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_count_trans_credit_third_10'),
        F.count('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_count_trans_credit_third_10'),
        F.count('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_count_trans_credit_third_10'),
        F.count('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_count_trans_credit_third_10'),
        F.count('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_count_trans_credit_third_10')
    )
)

In [0]:
#Merchant category total counts
mcc_cat_counts_df = (
    trans_df
    .withColumn('mcc_cat_not_avail', case_when('mcc_category','Not Available'))
    .withColumn('mcc_cat_misc_stores', case_when('mcc_category','Miscellaneous Stores'))
    .withColumn('mcc_cat_retail_stores', case_when('mcc_category','Retail Stores'))
    .withColumn('mcc_cat_automobiles', case_when('mcc_category','Automobiles and Vehicles'))
    .withColumn('mcc_cat_serv_prov', case_when('mcc_category','Service Providers'))
    .withColumn('mcc_cat_utilities', case_when('mcc_category','Utilities'))
    .withColumn('mcc_cat_trans', case_when('mcc_category','Transportation'))
    .withColumn('mcc_cat_clothing', case_when('mcc_category','Clothing Stores'))
    .withColumn('mcc_cat_mail_tele', case_when('mcc_category','Mail Order/Telephone Order Providers'))
    .withColumn('mcc_cat_amusement', case_when('mcc_category','Amusement and Entertainment'))
    .groupBy('ssn_supertoken')
    .agg(
        F.count('mcc_cat_not_avail').cast("int").alias('mcc_cat_not_avail_count_trans'),
        F.count('mcc_cat_misc_stores').cast("int").alias('mcc_cat_misc_stores_count_trans'),
        F.count('mcc_cat_retail_stores').cast("int").alias('mcc_cat_retail_stores_count_trans'),
        F.count('mcc_cat_automobiles').cast("int").alias('mcc_cat_automobiles_count_trans'),
        F.count('mcc_cat_serv_prov').cast("int").alias('mcc_cat_serv_prov_count_trans'),
        F.count('mcc_cat_utilities').cast("int").alias('mcc_cat_utilities_count_trans'),
        F.count('mcc_cat_trans').cast("int").alias('mcc_cat_misc_trans_count_trans'),
        F.count('mcc_cat_clothing').cast("int").alias('mcc_cat_clothing_count_trans'),
        F.count('mcc_cat_mail_tele').cast("int").alias('mcc_cat_mail_tele_count_trans'),
        F.count('mcc_cat_amusement').cast("int").alias('mcc_cat_amusement_count_trans')
    )
)

In [0]:
#Unique cardkeys
cardkeys_per_ssn = (
    trans_df
        .groupby('ssn_supertoken')
        .agg(F.countDistinct(F.col('cardkey')).cast("int").alias('unique_cardkeys'))
)

In [0]:
#Most frequent customer key
df_cust_key = (
    trans_df
    .groupBy('ssn_supertoken')
    .agg(F.countDistinct(F.col("customerkey")).cast("int").alias("unique_cust_key")))

ssn_cust_window = F.row_number().over(Window.partitionBy("ssn_supertoken").orderBy(F.col("customerkey").desc()))
joined_cust_key_df = (
    trans_df
    .groupBy("ssn_supertoken", "customerkey")
    .agg(
        F.count(F.col("customerkey")).cast("int").alias("cust_key_count")
    )
    .withColumn("rn", ssn_cust_window)
    .filter("rn = 1")
    .select(
        F.col("ssn_supertoken"), F.col("customerkey").alias("most_frequent_cust_key")))

joined_cust_key_df = df_cust_key.join(joined_cust_key_df, 'ssn_supertoken', 'inner')

In [0]:
#MCC Code count and most frequent MCC code
df_mcc_code = (
    trans_df
    .groupBy('ssn_supertoken')
    .agg(F.countDistinct(F.col("mcc")).cast("int").alias("unique_mcc_code_count_trans")
))
ssn_mcc_window = F.row_number().over(Window.partitionBy("ssn_supertoken").orderBy(F.col("mcc_code_count").desc()))
joined_mcc_code_df = (
    trans_df
    .groupBy("ssn_supertoken", "mcc")
    .agg(
        F.count(F.col("mcc")).cast("int").alias("mcc_code_count")
    )
    .withColumn("rn", ssn_mcc_window)
    .filter("rn = 1")
    .select(
        F.col("ssn_supertoken"), F.col("mcc").alias("most_frequent_mcc_code_trans")
    )
)
joined_mcc_code_df = df_mcc_code.join(joined_mcc_code_df, 'ssn_supertoken', 'inner')

In [0]:
#Unique trancode and trancode count
df_trancode = (
    trans_df
    .groupBy('ssn_supertoken')
    .agg(F.countDistinct(F.col("trancode")).cast("int").alias("unique_trancode_count_trans")
))
ssn_trancode_window = F.row_number().over(Window.partitionBy("ssn_supertoken").orderBy(F.col("transcode_count").desc()))
most_freq_trancode_df = (
    trans_df
    .groupBy("ssn_supertoken", "trancode")
    .agg(
        F.count(F.col("trancode")).cast("int").alias("transcode_count")
    )
    .withColumn("rn", ssn_trancode_window)
    .filter("rn = 1")
    .select(
        F.col("ssn_supertoken"), F.col("trancode").alias("most_frequent_trancode_trans")
    )
)
joined_tranc_df = df_trancode.join(most_freq_trancode_df, 'ssn_supertoken', 'inner')

In [0]:
#Merchant Category Means

mcc_cat_transamt_df = (
    trans_df
    .withColumn('mcc_cat_not_avail', F.when(F.expr("mcc_category=='Not Available'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_misc_stores', F.when(F.expr("mcc_category=='Miscellaneous Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_retail_stores', F.when(F.expr("mcc_category=='Retail Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_automobiles', F.when(F.expr("mcc_category=='Automobiles and Vehicles'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_serv_prov', F.when(F.expr("mcc_category=='Service Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_utilities', F.when(F.expr("mcc_category=='Utilities'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_trans', F.when(F.expr("mcc_category=='Transportation'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_clothing', F.when(F.expr("mcc_category=='Clothing Stores'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_mail_tele',F.when(F.expr("mcc_category=='Mail Order/Telephone Order Providers'"),F.col('transamt')).otherwise(None))
    .withColumn('mcc_cat_amusement', F.when(F.expr("mcc_category=='Amusement and Entertainment'"),F.col('transamt')).otherwise(None))
    .groupBy('ssn_supertoken')
    .agg(
        F.mean('mcc_cat_not_avail').cast("float").alias('mcc_cat_not_avail_mean_trans'),
        F.mean('mcc_cat_misc_stores').cast("float").alias('mcc_cat_misc_stores_mean_trans'),
        F.mean('mcc_cat_retail_stores').cast("float").alias('mcc_cat_retail_stores_mean_trans'),
        F.mean('mcc_cat_automobiles').cast("float").alias('mcc_cat_automobiles_mean_trans'),
        F.mean('mcc_cat_serv_prov').cast("float").alias('mcc_cat_serv_prov_mean_trans'),
        F.mean('mcc_cat_utilities').cast("float").alias('mcc_cat_utilities_mean_trans'),
        F.mean('mcc_cat_trans').cast("float").alias('mcc_cat_misc_transp_mean_trans'),
        F.mean('mcc_cat_clothing').cast("float").alias('mcc_cat_clothing_mean_trans'),
        F.mean('mcc_cat_mail_tele').cast("float").alias('mcc_cat_mail_tele_mean_trans'),
        F.mean('mcc_cat_amusement').cast("float").alias('mcc_cat_amusement_mean_trans')
    )
)

In [0]:
#Balance feature summary stats

avail_balance_summary_df = (
        trans_df
            .groupBy('ssn_supertoken')
            .agg(
                F.min(F.col("availablebalance_msghash")).cast("float").alias("min_avail_bal_amount"),
                F.max(F.col("availablebalance_msghash")).cast("float").alias("max_avail_bal_amount"),
                F.mean(F.col("availablebalance_msghash")).cast("float").alias("mean_avail_bal_amount"),
                F.stddev(F.col("availablebalance_msghash")).cast("float").alias("stddev_avail_bal_amount")
            ))

ledger_balance_summary_df = (
        trans_df
            .groupBy('ssn_supertoken')
            .agg(
                F.min(F.col("ledgerbalance_msghash")).cast("float").alias("min_ledger_bal_amount"),
                F.max(F.col("ledgerbalance_msghash")).cast("float").alias("max_ledger_bal_amount"),
                F.mean(F.col("ledgerbalance_msghash")).cast("float").alias("mean_ledger_bal_amount"),
                F.stddev(F.col("ledgerbalance_msghash")).cast("float").alias("stddev_ledger_bal_amount")
            ))

In [0]:
#Unique networkid and count
df_networkid = (
    trans_df
    .groupBy('ssn_supertoken')
    .agg(F.countDistinct(F.col("networkid")).cast("int").alias("unique_networkid_count_trans")
))
ssn_networkid_window = F.row_number().over(Window.partitionBy("ssn_supertoken").orderBy(F.col("networkid_count").desc()))
most_freq_networkid_df = (
    trans_df
    .groupBy("ssn_supertoken", "networkid")
    .agg(
        F.count(F.col("networkid")).cast("int").alias("networkid_count")
    )
    .withColumn("rn", ssn_networkid_window)
    .filter("rn = 1")
    .select(
        F.col("ssn_supertoken"), F.col("networkid").alias("most_frequent_networkid_trans")
    )
)
joined_networkid_df = df_networkid.join(most_freq_networkid_df, 'ssn_supertoken', 'inner')

#### Start stitching together a final featues dataframe from each of the features above

In [0]:
ssn_df = trans_df.select('ssn_supertoken').distinct()

In [0]:
ssn_df.count()

In [0]:
features_trans_df = (
    ssn_df
        .join(credits_per_ssn, on='ssn_supertoken', how='left')
        .join(debits_per_ssn, on='ssn_supertoken', how='left')
        .join(credit_txn_amounts_per_ssn, on='ssn_supertoken', how='left')
        .join(debit_txn_amounts_per_ssn, on='ssn_supertoken', how='left')
        .join(trans_d_summary_df, on='ssn_supertoken', how='left')
        .join(trans_c_summary_df, on='ssn_supertoken', how='left')
        .join(df_dollar_bands_trans_cred, on='ssn_supertoken', how='left')
        .join(df_dollar_bands_trans_deb, on='ssn_supertoken', how='left')
        .join(df_trans_weekend_cred, on='ssn_supertoken', how='left')
        .join(df_trans_weekend_deb, on='ssn_supertoken', how='left')
        .join(df_trans_time_of_day_cred, on='ssn_supertoken',how='left')
        .join(df_trans_time_of_day_deb, on='ssn_supertoken',how='left')
        .join(joined_mcc_trans_df, on='ssn_supertoken',how='left')
        .join(joined_merch_city_trans_df, on='ssn_supertoken',how='left')
        .join(joined_merch_state_trans_df, on='ssn_supertoken',how='left')
        .join(joined_merch_zip_trans_df, on='ssn_supertoken',how='left')
        .join(monthly_stats_credit_df,on='ssn_supertoken',how='left')
        .join(monthly_stats_debit_df,on='ssn_supertoken',how='left')
        .join(mcc_cat_counts_df, on='ssn_supertoken',how='left')
        .join(cardkeys_per_ssn, on='ssn_supertoken',how='left')
        .join(joined_cust_key_df, on='ssn_supertoken',how='left')
        .join(joined_mcc_code_df, on='ssn_supertoken',how='left')
        .join(joined_tranc_df,on='ssn_supertoken',how='left')
        .join(mcc_cat_transamt_df,on='ssn_supertoken',how='left')
        .join(avail_balance_summary_df, on='ssn_supertoken',how='left')
        .join(ledger_balance_summary_df, on='ssn_supertoken', how='left')
        .join(joined_networkid_df, on='ssn_supertoken', how='left')
        .join(df_trans_count, on='ssn_supertoken', how='left')
        .join(mcc_cat_transamt_sum_debit_1, on='ssn_supertoken', how='left')
        .join(mcc_cat_transamt_sum_debit_2, on='ssn_supertoken', how='left')
        .join(mcc_cat_transamt_sum_debit_3, on='ssn_supertoken', how='left')
        .join(mcc_cat_transamt_sum_credit_1, on='ssn_supertoken', how='left')
        .join(mcc_cat_transamt_sum_credit_2, on='ssn_supertoken', how='left')
        .join(mcc_cat_transamt_sum_credit_3, on='ssn_supertoken', how='left')
        .join(mcc_cat_count_debit_1, on='ssn_supertoken', how='left')
        .join(mcc_cat_count_debit_2, on='ssn_supertoken', how='left')
        .join(mcc_cat_count_debit_3, on='ssn_supertoken', how='left')
        .join(mcc_cat_count_credit_1, on='ssn_supertoken', how='left')
        .join(mcc_cat_count_credit_2, on='ssn_supertoken', how='left')
        .join(mcc_cat_count_credit_3, on='ssn_supertoken', how='left')
        

).fillna(0)

In [0]:
features_trans_df.count()

In [0]:
#Post aggregation column inspection
#features_trans_df.columns

In [0]:
 (
     features_trans_df.write
      .format('parquet')
      .mode('overwrite')
      .option('compression', 'snappy')
      .save('gs://ds-greendot/ac/chargeback/model_test_trans_20210901')
 )

In [0]:
# features_trans_df.select( 'stddev_avail_bal_amount',
#  'min_ledger_bal_amount',
#  'max_ledger_bal_amount',
#  'mean_ledger_bal_amount',
#  'stddev_ledger_bal_amount',
#  'unique_networkid_count_trans',
#  'most_frequent_networkid_trans').show()

---

Don't have to run these below - just for some verification

In [0]:
# after debits tweak
#final_df.limit(5).toPandas().T

## Adhoc investigation

In [0]:
# prep_df.filter(F.col('dispute') == 'Yes').select('customer_createdate', 'initial_load_date','first_reload_date', 'first_direct_deposit_date','last_activity_date',).distinct().sort('postdate').show(truncate=False)

In [0]:
# final_df.filter(F.col('dispute')=='yes').select('customer_createdate', 'postdate', 'dispute_created_date', ).distinct().sort('postdate').show(truncate=False)