# TracHack Group 5
### Cesar Diez, Elias Eskind, Austin Gravely, & Arlet Rodriguez
#### 4/25/2021



We used a gradient boosted tree classification model to predict one time redeemers

## Loading in required packages

In [1]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import MulticlassMetrics
import pyspark.sql.functions as func
from pyspark.sql.types import StringType

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
148,application_1617643957232_0149,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Creating the functions
## Selecting Data

In [2]:
# Picking which sheets and which columns from those sheets we want to use
def select_data(data_frame):
    '''Selects and transforms the raw GCR (JSON) records data frame into a data frame.'''
    
    ##### Create UDFs
    func_udf = func.udf(lambda X: "empty" if X=='' else X , StringType())
    
    ##### Adding columns
    df_selection = data_frame.select('line_id',"phone_info", 'contact_info','loyalty-program', 'byop_phone_info', 'first_redemption_channel',
                                     'first_redemption_date','carrier', "network-usage-international",
                      func.explode_outer("network-usage-domestic").alias('network-usage-domestic'))
 
    ##### Have to do twice because of explode_outer
    
    df_selection = df_selection.select('line_id',"phone_info.*",'contact_info.*', 'loyalty-program.*', 'byop_phone_info.gsma_device_type', 
                                       'byop_phone_info.gsma_operating_system', func.col('byop_phone_info.manufacturer').alias('byop_manufacturer'), 'byop_phone_info.os_family',
                                       'byop_phone_info.touch_screen', 'first_redemption_channel', 'first_redemption_date', 'carrier', "network-usage-domestic",
                      func.explode_outer("network-usage-international").alias('network-usage-international'))
    
    
    ##### Final Selection
    df_selection = df_selection.select('line_id',"manufacturer","model","operating_system","release_date", # phone info
                                       'extd_warranty', 'model_type', 'technology','language_preference', 'display_description', 'fm_radio', 'has_wifi_calling', 'mobile_hotspot', 'data_capable', # phone info
                                       'opt_out_email', 'opt_out_mobiles_ads', 'opt_out_phone', 'state', 'lrp_enrolled', #contact info/loyalty program
                                       'gsma_device_type', 'gsma_operating_system', 'byop_manufacturer','os_family', 'touch_screen', # byob
                                       'first_redemption_channel', 'first_redemption_date', 'carrier', # other
                      # Domestic 
                      func.col("network-usage-domestic.hotspot_kb"),
                      func.col("network-usage-domestic.total_kb"),
                      func.col("network-usage-domestic.voice_count_total"),  
                      func.col("network-usage-domestic.sms_in"),
                      func.col("network-usage-domestic.sms_out"),
                      func.col("network-usage-domestic.mms_in"),
                      func.col("network-usage-domestic.mms_out"),
                      #International
                      func.col("network-usage-international.total_kb").alias("int_total_kb"),
                      func.col("network-usage-international.voice_count_total").alias("int_voice_count_total"),  
                      func.col("network-usage-international.hotspot_kb").alias("int_hotspot_kb"),  
                      func.col("network-usage-international.sms_in").alias("int_sms_in"),
                      func.col("network-usage-international.sms_out").alias("int_sms_out"),
                      func.col("network-usage-international.mms_in").alias("int_mms_in"),
                      func.col("network-usage-international.mms_out").alias("int_mms_out")).withColumn("language_preference", func_udf(df_selection['language_preference']))
    
    
    return df_selection

# Get Usage Summary

def get_usage_summary(df):
    '''Returns aggregate voice count and data usage columns.'''
    
    ##### Adding summary variables for usuage (domestic and international)
    
    #Domestic
    data_voice_count = df.select('line_id', "voice_count_total")
    data_voice_count = data_voice_count.groupby('line_id').agg(func.sum('voice_count_total'))\
                        .selectExpr('line_id','`sum(voice_count_total)` as domestic_voice_count_total')
    
    data_hot = df.select('line_id','hotspot_kb')
    data_hot = data_hot.groupby('line_id').agg(func.sum('hotspot_kb')).selectExpr('line_id',
                    '`sum(hotspot_kb)` as domestic_data_hotspot_kb')
    
    data_kb = df.select('line_id','total_kb')
    data_kb = data_kb.groupby('line_id').agg(func.sum('total_kb')).selectExpr('line_id',
                    '`sum(total_kb)` as domestic_data_usage_kb')
    
    sms_in = df.select('line_id','sms_in')
    sms_in = sms_in.groupby('line_id').agg(func.sum('sms_in')).selectExpr('line_id',
                    '`sum(sms_in)` as domestic_sms_in')

    sms_out = df.select('line_id','sms_out')
    sms_out = sms_out.groupby('line_id').agg(func.sum('sms_out')).selectExpr('line_id',
                    '`sum(sms_out)` as domestic_sms_out')
    
    mms_in = df.select('line_id','mms_in')
    mms_in = mms_in.groupby('line_id').agg(func.sum('mms_in')).selectExpr('line_id',
                    '`sum(mms_in)` as domestic_mms_in')
    
    mms_out = df.select('line_id','mms_out')
    mms_out = mms_out.groupby('line_id').agg(func.sum('mms_out')).selectExpr('line_id',
                    '`sum(mms_out)` as domestic_mms_out')
    
    # International
    
    int_voice_count_total = df.select('line_id','int_voice_count_total')
    int_voice_count_total = int_voice_count_total.groupby('line_id').agg(func.sum('int_voice_count_total')).selectExpr('line_id',
                    '`sum(int_voice_count_total)` as int_voice_count_total_sum')
    
    int_hotspot_kb = df.select('line_id','int_hotspot_kb')
    int_hotspot_kb = int_hotspot_kb.groupby('line_id').agg(func.sum('int_hotspot_kb')).selectExpr('line_id',
                    '`sum(int_hotspot_kb)` as int_hotspot_kb_sum')
    
    int_total_kb = df.select('line_id','int_total_kb')
    int_total_kb = int_total_kb.groupby('line_id').agg(func.sum('int_total_kb')).selectExpr('line_id',
                    '`sum(int_total_kb)` as int_total_kb_sum')
    
    int_sms_in = df.select('line_id','int_sms_in')
    int_sms_in = int_sms_in.groupby('line_id').agg(func.sum('int_sms_in')).selectExpr('line_id',
                    '`sum(int_sms_in)` as int_sms_in_sum')
        
    int_sms_out = df.select('line_id','int_sms_out')
    int_sms_out = int_sms_out.groupby('line_id').agg(func.sum('int_sms_out')).selectExpr('line_id',
                    '`sum(int_sms_out)` as int_sms_out_sum')
    
    int_mms_in = df.select('line_id','int_mms_in')
    int_mms_in = int_mms_in.groupby('line_id').agg(func.sum('int_mms_in')).selectExpr('line_id',
                    '`sum(int_mms_in)` as int_mms_in_sum')
    
    int_mms_out = df.select('line_id','int_mms_out')
    int_mms_out = int_mms_out.groupby('line_id').agg(func.sum('int_mms_out')).selectExpr('line_id',
                    '`sum(int_mms_out)` as int_mms_out_sum')
    
    #### Joining on line_id
    usage_summary=data_voice_count.join(data_kb, on = 'line_id')
    usage_summary=usage_summary.join(data_hot, on = 'line_id')
    usage_summary=usage_summary.join(sms_in, on = 'line_id')
    usage_summary=usage_summary.join(sms_out, on = 'line_id')
    usage_summary=usage_summary.join(mms_in, on = 'line_id')
    usage_summary=usage_summary.join(mms_out, on = 'line_id')
    usage_summary=usage_summary.join(int_voice_count_total, on = 'line_id')
    usage_summary=usage_summary.join(int_hotspot_kb, on = 'line_id')
    usage_summary=usage_summary.join(int_total_kb, on = 'line_id')
    usage_summary=usage_summary.join(int_sms_in, on = 'line_id')
    usage_summary=usage_summary.join(int_sms_out, on = 'line_id')
    usage_summary=usage_summary.join(int_mms_in, on = 'line_id')
    usage_summary=usage_summary.join(int_mms_out, on = 'line_id')
    
    
    usage_summary= usage_summary.fillna(0)
    
    return usage_summary

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Preprocessing

In [3]:
# phone_info
def preprocess_phone_info(df):
    '''Imputes null values in manufacturer and model column with most frequent value.'''
    # The pattern for the release date
    pattern = "M/d/yyyy"
    # Selecting what columns from phone_info we want
    phone_info=df.select('line_id','manufacturer','model', 'operating_system', func.unix_timestamp("release_date", pattern).alias("release_date"),
                         'extd_warranty', 'model_type', 'technology', 'display_description', 
                         'fm_radio', 'has_wifi_calling', 'mobile_hotspot', 'data_capable').dropDuplicates()
    
    # Finding the mode of certain columns so we can replace the null values with said mode
    mode_manufacturer=phone_info.filter(func.col('manufacturer').isNotNull()).groupby('manufacturer').count().sort(func.col('count').desc()).first()['manufacturer']
    mode_model=phone_info.filter(func.col('model').isNotNull()).groupby('model').count().sort(func.col('count').desc()).first()['model']
    mode_display_description=phone_info.filter(func.col('display_description').isNotNull()).groupby('display_description').count().sort(func.col('count').desc()).first()['display_description']
    
    ### DATE STUFF ###
    mode_date = phone_info.filter(func.col('release_date').isNotNull()).groupby('release_date').count().sort(func.col('count').desc()).first()['release_date']
    ##################
    
    #Filling nulls
    phone_info=phone_info.fillna({'manufacturer': mode_manufacturer,'model': mode_model, 
                                  'operating_system': "Other", 'release_date': mode_date, 
                                  'extd_warranty' : "No", 'model_type' : "Non-Touch", 'technology': "Other",
                                  'display_description': mode_display_description, 'fm_radio': "N", 'has_wifi_calling': "N", 
                                  'mobile_hotspot': "Y", 'data_capable': 0})
    return phone_info


# contact_info
def preprocess_contact_info(df):
    
    # Selecting what columns from contact_info we want
    contact_info=df.select('line_id', 'language_preference', 'opt_out_email', 'opt_out_mobiles_ads', 'opt_out_phone', 'state').dropDuplicates()
    # Dealing with the UDF for language we created earlier
    contact_info = contact_info.withColumn("language_preference", \
              func.when(contact_info["language_preference"] == "", "empty").otherwise(contact_info["language_preference"]))
    
    ##### Creating a new column for opt outs #####
    contact_info = contact_info.withColumn(
    'opt_out',
    func.when((func.col("opt_out_email") == 1) & (func.col("opt_out_mobiles_ads")  == 1) & (func.col("opt_out_phone") == 1), 'all')\
    .when((func.col("opt_out_email") == 1) | (func.col("opt_out_mobiles_ads")  == 1) & (func.col("opt_out_phone") == 1), 'some1')\
    .when((func.col("opt_out_email") == 1) & (func.col("opt_out_mobiles_ads")  == 1) | (func.col("opt_out_phone") == 1), 'some2')\
    .when((func.col("opt_out_email") == 1) | (func.col("opt_out_mobiles_ads")  == 1) | (func.col("opt_out_phone") == 1), 'some3')\
    .when((func.col("opt_out_email") == -1) & (func.col("opt_out_mobiles_ads")  == -1) & (func.col("opt_out_phone") == -1), 'none')\
    .when((func.col("opt_out_email") == 'null') & (func.col("opt_out_mobiles_ads")  == 'null') & (func.col("opt_out_phone") == -1), 'none2')\
    .when((func.col("opt_out_email") == 'null') & (func.col("opt_out_mobiles_ads")  == -1) & (func.col("opt_out_phone") == 'null'), 'none3')\
    .when((func.col("opt_out_email") == -1) & (func.col("opt_out_mobiles_ads")  == 'null') & (func.col("opt_out_phone") == 'null'), 'none4')
    .otherwise('none'))
    
    #### Dropping old ones
    contact_info.drop("opt_out_email","opt_out_mobiles_ads","opt_out_phone")
    ##############################################
    
    # Mode for state to fill nulls
    mode_state=contact_info.filter(func.col('state').isNotNull()).groupby('state').count().sort(func.col('count').desc()).first()['state']
    
    # Filling nulls
    contact_info=contact_info.fillna({'language_preference':'EN','opt_out': 'null', 'state' : mode_state})
    
    return contact_info


# byob_info
def preprocess_byop_info(df):
    
    # Same as functions above
    byop_info=df.select('line_id', 'gsma_device_type', 'gsma_operating_system', 'byop_manufacturer','os_family', 'touch_screen').dropDuplicates()

    mode_gsma_device_type=byop_info.filter(func.col('gsma_device_type').isNotNull()).groupby('gsma_device_type').count().sort(func.col('count').desc()).first()['gsma_device_type']
    mode_gsma_operating_system=byop_info.filter(func.col('gsma_operating_system').isNotNull()).groupby('gsma_operating_system').count().sort(func.col('count').desc()).first()['gsma_operating_system']
    mode_byop_manufacturer=byop_info.filter(func.col('byop_manufacturer').isNotNull()).groupby('byop_manufacturer').count().sort(func.col('count').desc()).first()['byop_manufacturer']
    mode_os_family=byop_info.filter(func.col('os_family').isNotNull()).groupby('os_family').count().sort(func.col('count').desc()).first()['os_family']  
        
    byop_info=byop_info.fillna({'gsma_device_type':mode_gsma_device_type, 'gsma_operating_system':mode_gsma_operating_system,'byop_manufacturer' : mode_byop_manufacturer,'os_family': mode_os_family, 'touch_screen': '1'})

    return byop_info


# loyalty_info
def preprocess_loyalty_info(df):
    loyalty_info=df.select('line_id', 'lrp_enrolled').dropDuplicates()
    loyalty_info=loyalty_info.fillna({'lrp_enrolled':'empty'})

    return loyalty_info


# first_redemption_info
def preprocess_remdemption_info(df):
    
    remdemption_info=df.select('line_id', 'first_redemption_channel').dropDuplicates()
    remdemption_info=remdemption_info.fillna({'first_redemption_channel':'null'})

    return remdemption_info


# carrier_info
def preprocess_carrier_info(df):
    
    carrier_info=df.select('line_id', 'carrier').dropDuplicates()
    carrier_info=carrier_info.fillna({'carrier':'null'})

    return carrier_info


# first_redemption_date
def preprocess_first_redemption_date(df):
    
    # Different pattern than last time
    pattern = "yyyy-MM-dd"
    redemption_date_info = df.select('line_id', func.unix_timestamp("first_redemption_date", pattern).alias("first_redemption_date")).dropDuplicates()
    mode_redemption_date = redemption_date_info.filter(func.col('first_redemption_date').isNotNull()).groupby('first_redemption_date').count().sort(func.col('count').desc()).first()['first_redemption_date']
    redemption_date_info = redemption_date_info.fillna({'first_redemption_date':mode_redemption_date})
    
    return redemption_date_info

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Getting Featurized Data

In [4]:
def fit_featurize_data(df, data_has_label=True,remove_orig_cols=True):
    '''Given a selected data frame, generate a featurized dataframe.

    Example: Shows how categorical features are handled by first a string indexer and then one-hot encoding.

    if remove_orig_cols is set, only returns a dataframe with two columns - features and label.
    '''
    
    ##### Categorical variables
    categorical_columns = ['manufacturer','model', 'operating_system', 'language_preference', 
                           'extd_warranty', 'model_type', 'technology', 'opt_out', 'state', 'lrp_enrolled',  
                          'gsma_device_type', 'gsma_operating_system', 'byop_manufacturer' , 'os_family', 'touch_screen', 
                           'first_redemption_channel', 'carrier',
                          'display_description', 'fm_radio', 'has_wifi_calling', 'mobile_hotspot']
    
    ##### Numeric variables
    numeric_columns = ["domestic_data_usage_kb", "domestic_voice_count_total", "domestic_sms_in", "domestic_data_hotspot_kb", 
                       "domestic_sms_out", "domestic_mms_in", "domestic_mms_out", 'int_voice_count_total_sum', 'int_hotspot_kb_sum',
                       'int_total_kb_sum', 'int_sms_in_sum', 'int_sms_out_sum', 'int_mms_in_sum', 'int_mms_out_sum', 'release_date',
                       'data_capable','first_redemption_date']
    
    # filling in any remaining nulls
    df = df.na.fill("NA", subset = categorical_columns)
    df = df.na.fill(0, subset = numeric_columns)    
    
    stages = []
    for column in categorical_columns:
        string_indexer = StringIndexer(inputCol = column, handleInvalid="keep",outputCol = column + '_index')
        encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[column + "_vec"])
        stages += [string_indexer, encoder]
    
    label_indexer = StringIndexer(inputCol ="one_time_redeemer", outputCol="label",handleInvalid="keep")
    stages += [label_indexer]
        
    assembler_inputs = [c + "_vec" for c in categorical_columns] + numeric_columns
    assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
    stages += [assembler]
    pipeline = Pipeline(stages = stages)
    pipeline_model = pipeline.fit(df)
    
    return pipeline_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Training Model

In [5]:
def get_balancing_ratio(data_frame, target_var='label'):
    """Calculates the class ratio of label vs non-label in given dataframe

    :param data_frame: dataframe with GCR records.
    :type data_frame: Dataframe.
    :param target_var: Name of label column.
    :type target_var: str.
    :returns: ratio of label vs non-label in input dataframe.
    :rtype: int.
    """
    active_count = data_frame.filter(func.col(target_var) == 0.0).count()
    total_count = data_frame.count()
    balancing_ratio = active_count / total_count
    return balancing_ratio

def train_model(train_dataset):
    '''Given a featurized training dataset, trains a simple logistic regression model and 
    returns the trained model object'''
    negative_ratio = np.round(get_balancing_ratio(train_dataset), 2)
    train_dataset = train_dataset.withColumn("classWeights",func.when(train_dataset['label'] == 1, negative_ratio).\
                                             otherwise(
                                            1 - negative_ratio))
    
    # Gradient Boosted Tree Classifier
    from pyspark.ml.classification import GBTClassifier
    gbt = GBTClassifier(labelCol="label", featuresCol="features", weightCol='classWeights',  maxIter=10)
    
    from pyspark.ml import Pipeline
    pipeline = Pipeline(stages=[gbt])

    model = pipeline.fit(train_dataset)
    prediction = model.transform(train_dataset)
    prediction.printSchema()

    # Evaluating it (not sure if necessary)
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("label")
    
    binEval.evaluate(prediction)
    
    #######
    
    return model

def evaluate_model(model, test_dataset):
    '''Given a model and featurized test dataset, returns the f1 value'''
    predictions = model.transform(test_dataset).select(['prediction', 'label'])
    metrics = MulticlassMetrics(predictions.rdd)
    if predictions.select('label').distinct().count()==2:
        f1 = metrics.fMeasure(label = 1.0)
    else: f1 = 0
    return round(f1,4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Make Predictions

In [6]:
def make_predictions(pipeline_model, model,eval_data_path,submission_path):
    '''Given a model, eval data path and submission path, makes predictions and 
saves the submissions to submission path.'''
    # Getting data from the evaluation file
    eval_data = spark.read.json(eval_data_path)
    
    # Getting columns
    df_selected = select_data(eval_data)
    usage_summary=get_usage_summary(df_selected)
    
    # Preprocessing
    phone_info=preprocess_phone_info(df_selected)
    contact_info=preprocess_contact_info(df_selected)
    loyalty_info=preprocess_loyalty_info(df_selected)
    byop_info=preprocess_byop_info(df_selected)
    redemption_info=preprocess_remdemption_info(df_selected)
    carrier_info=preprocess_carrier_info(df_selected)
    redemption_date_info=preprocess_first_redemption_date(df_selected)
    line_ids=df_selected.select('line_id').dropDuplicates()
    
    #Joining processed data
    df_preprocessed=line_ids.join(usage_summary,on='line_id', how = 'full')
    df_preprocessed=df_preprocessed.join(phone_info,on='line_id', how = 'full')
    df_preprocessed=df_preprocessed.join(contact_info,on='line_id', how = 'full')
    df_preprocessed=df_preprocessed.join(loyalty_info,on='line_id', how = 'full')
    df_preprocessed=df_preprocessed.join(byop_info,on='line_id', how = 'full')
    df_preprocessed=df_preprocessed.join(redemption_info,on='line_id', how = 'full')
    df_preprocessed=df_preprocessed.join(carrier_info,on='line_id', how = 'full')
    df_preprocessed=df_preprocessed.join(redemption_date_info,on='line_id', how = 'full') 
    

    categorical_columns = ['manufacturer','model', 'operating_system', 'language_preference', 
                           'extd_warranty', 'model_type', 'technology', 'opt_out', 'state', 'lrp_enrolled',  
                          'gsma_device_type', 'gsma_operating_system', 'byop_manufacturer' , 'os_family', 'touch_screen', 
                           'first_redemption_channel', 'carrier',
                          'display_description', 'fm_radio', 'has_wifi_calling', 'mobile_hotspot']
    numeric_columns = ["domestic_data_usage_kb", "domestic_voice_count_total", "domestic_sms_in", "domestic_data_hotspot_kb", 
                       "domestic_sms_out", "domestic_mms_in", "domestic_mms_out", 'int_voice_count_total_sum', 'int_hotspot_kb_sum',
                       'int_total_kb_sum', 'int_sms_in_sum', 'int_sms_out_sum', 'int_mms_in_sum', 'int_mms_out_sum', 'release_date',
                       'data_capable','first_redemption_date'] 

    df_preprocessed = df_preprocessed.na.fill("NA", subset = categorical_columns)
    df_preprocessed = df_preprocessed.na.fill(0, subset = numeric_columns)   
    df_featurized = pipeline_model.transform(df_preprocessed)

    # Do you want to remove the original columns?  I dooooo
    remove_orig_cols = True
    if remove_orig_cols:
        selected_columns = ['line_id','features']
        df_featurized = df_featurized.select(selected_columns)

    # Predicting the one-time-redeemers for the evaluation dataset
    predictions=model.transform(df_featurized).select(['line_id', 'prediction'])
    predictions=predictions.selectExpr('line_id','prediction as one_time_redeeemer')
    
    return predictions

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Geting data from s3

In [7]:
data_folder='s3://tf-trachack-data/211/data/'
# This path will be active the launch of the hackathon
teamname = 'trachack-project-groups-5-umiami'
root_folder='s3://tf-trachack-notebooks/trachack-project-groups-5-umiami/jupyter/jovyan/'

spark = SparkSession.builder.appName('trachack-code-submission').enableHiveSupport().getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Main()
#### Doing it like this because if it is all in one function it takes > 60 minutes and times out
(we could always increase the timeout time in terminal but whatever)

In [8]:
##TIMER##
import timeit
start = timeit.default_timer()
#########

# Getting datapath
data_path = data_folder + "dev.json.bz2"

# Getting data from s3
df = spark.read.json(data_path)  

# Selecting which columns we want
df_selected = select_data(df)
usage_summary=get_usage_summary(df_selected)

# Preprocessing
phone_info=preprocess_phone_info(df_selected)
contact_info=preprocess_contact_info(df_selected)
loyalty_info=preprocess_loyalty_info(df_selected)
byop_info=preprocess_byop_info(df_selected)
redemption_info=preprocess_remdemption_info(df_selected)
carrier_info=preprocess_carrier_info(df_selected)
redemption_date_info=preprocess_first_redemption_date(df_selected)

# Joining on line_id
line_ids=df.select('line_id','one_time_redeemer').dropDuplicates()
df_preprocessed=line_ids.join(usage_summary,on='line_id', how = 'full')
df_preprocessed=df_preprocessed.join(phone_info,on='line_id', how = 'full')
df_preprocessed=df_preprocessed.join(contact_info,on='line_id', how = 'full')
df_preprocessed=df_preprocessed.join(loyalty_info,on='line_id', how = 'full')
df_preprocessed=df_preprocessed.join(byop_info,on='line_id', how = 'full')
df_preprocessed=df_preprocessed.join(redemption_info,on='line_id', how = 'full')
df_preprocessed=df_preprocessed.join(carrier_info,on='line_id', how = 'full') 
df_preprocessed=df_preprocessed.join(redemption_date_info,on='line_id', how = 'full')

# How long did it take?
stop = timeit.default_timer()
print('Runtime:', (stop - start)//60, 'minutes &', round(((stop - start)%60),2), 'seconds')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Runtime: 3.0 minutes & 19.82 seconds

In [9]:
# Seperate chunks because this chunk takes ~45 minutes
start = timeit.default_timer()
pipeline_model = fit_featurize_data(df_preprocessed)
df_featurized = pipeline_model.transform(df_preprocessed)

remove_orig_cols=True
if remove_orig_cols:
    selected_columns = ['features', 'label']
    df_featurized= df_featurized.select(selected_columns)

df_featurized.cache()
stop = timeit.default_timer()
print('Runtime:', (stop - start)//60, 'minutes &', round(((stop - start)%60),2), 'seconds')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Runtime: 42.0 minutes & 47.95 seconds

In [10]:
# Creating the model and testing it
start = timeit.default_timer()

# Selecting a seed for reproducibility 
random_seed = 69 # we're mature I swear

# The ratio we want training to testing to be
test_ratio = .05

train, test = df_featurized.randomSplit([1.0 - test_ratio , test_ratio], seed = random_seed)

num_train = train.count()
num_test = test.count()
print(f"Train has {num_train}")
print(f"Test has {num_test}")

# Create model
model = train_model(train)

# Our f1 score (evaluation of model)
f1 = evaluate_model(model, test)

#How long did it take?
stop = timeit.default_timer()
print('Runtime:', (stop - start)//60, 'minutes &', round(((stop - start)%60),2), 'seconds')

# How'd we do?
print(f"f1-score: {f1}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Train has 69856
Test has 3703
root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- classWeights: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Runtime: 9.0 minutes & 24.99 seconds
f1-score: 0.5048

# Making Our Submission

In [11]:
start = timeit.default_timer()
eval_data_path=data_folder+'eval.json.bz2'
submission_path=root_folder+"submission/2021-04-25/" #Change the date to whatever is required

predictions = make_predictions(pipeline_model, model,eval_data_path,submission_path).cache()

# Does it looks right?
predictions.show(5,truncate=False)
stop = timeit.default_timer()
print('Runtime:', (stop - start)//60, 'minutes &', round(((stop - start)%60),2), 'seconds')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------------+------------------+
|line_id                             |one_time_redeeemer|
+------------------------------------+------------------+
|00035778-3662-4112-9c68-8993a4d20ca9|0.0               |
|008b9332-d302-446f-aff2-b6317bda785e|0.0               |
|0399fe7e-2f11-48f1-bad3-aa30682f3f05|0.0               |
|0595432d-b946-4e99-9c7d-eb6eb58711ea|1.0               |
|06e0b271-ca1b-4d41-85ef-569a0b33b2cd|0.0               |
+------------------------------------+------------------+
only showing top 5 rows

Runtime: 6.0 minutes & 28.82 seconds

In [12]:
# If so then lets submit it
start = timeit.default_timer()
predictions.write.mode('overwrite').option('header',True).csv(submission_path)
print(f"submission saved to {submission_path}")

# We like timing things
stop = timeit.default_timer()
print('Runtime:', (stop - start)//60, 'minutes &', round(((stop - start)%60),2), 'seconds')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

submission saved to s3://tf-trachack-notebooks/trachack-project-groups-5-umiami/jupyter/jovyan/submission/2021-04-25/
Runtime: 0.0 minutes & 29.98 seconds

#### Making sure we have 100% coverage

In [13]:
num_eval=spark.read.option('header',True).csv(eval_data_path)
num_eval.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

49231

In [14]:
num_predictions = predictions.count()
coverage = num_predictions / 49232

# If this = 1 then we are at 100% coverage
print(f"Coverage: {coverage}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Coverage: 1.0

In [None]:
# yay!