In [1]:
import pyspark
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.sql import SparkSession
from dotenv import dotenv_values

In [2]:
config = dotenv_values('../.env')
spark=SparkSession.builder.appName('XGB').getOrCreate()
spark

# Some parameters

In [3]:
VER = 1 # Version number
SEED = 42 # Random seed
NAN_VALUE = -127 # Fill NAN value
FOLDS = 5 # Folds per model

# Read Data and Fill NAN Values

In [4]:
train_df = spark.read.csv(config["TRAIN_PATH"], header=True, inferSchema=True)

In [5]:
train_df.printSchema()

root
 |-- customer_ID: string (nullable = true)
 |-- S_2: timestamp (nullable = true)
 |-- P_2: double (nullable = true)
 |-- D_39: double (nullable = true)
 |-- B_1: double (nullable = true)
 |-- B_2: double (nullable = true)
 |-- R_1: double (nullable = true)
 |-- S_3: double (nullable = true)
 |-- D_41: double (nullable = true)
 |-- B_3: double (nullable = true)
 |-- D_42: double (nullable = true)
 |-- D_43: double (nullable = true)
 |-- D_44: double (nullable = true)
 |-- B_4: double (nullable = true)
 |-- D_45: double (nullable = true)
 |-- B_5: double (nullable = true)
 |-- R_2: double (nullable = true)
 |-- D_46: double (nullable = true)
 |-- D_47: double (nullable = true)
 |-- D_48: double (nullable = true)
 |-- D_49: double (nullable = true)
 |-- B_6: double (nullable = true)
 |-- B_7: double (nullable = true)
 |-- B_8: double (nullable = true)
 |-- D_50: double (nullable = true)
 |-- D_51: double (nullable = true)
 |-- B_9: double (nullable = true)
 |-- R_3: double (nullable 

In [6]:
train_df.select('customer_ID').head(5)

[Row(customer_ID='0000099d6bd597052cdcda90ffabf56573fe9d7c79be5fbac11a8ed792feb62a'),
 Row(customer_ID='0000099d6bd597052cdcda90ffabf56573fe9d7c79be5fbac11a8ed792feb62a'),
 Row(customer_ID='0000099d6bd597052cdcda90ffabf56573fe9d7c79be5fbac11a8ed792feb62a'),
 Row(customer_ID='0000099d6bd597052cdcda90ffabf56573fe9d7c79be5fbac11a8ed792feb62a'),
 Row(customer_ID='0000099d6bd597052cdcda90ffabf56573fe9d7c79be5fbac11a8ed792feb62a')]

## Fill NAN Values

In [7]:
train_df.select([F.sum(F.col(c).isNull().cast(T.IntegerType())).alias(c) for c in train_df.columns]).show()

+-----------+---+-----+----+---+----+---+-------+----+----+-------+-------+------+---+----+---+---+-------+----+------+-------+---+---+-----+-------+----+---+---+-----+------+----+-------+---+----+---+----+---+-------+----+---+------+-------+-----+---+----+-------+----+------+----+------+----+----+------+----+------+----+----+-------+----+----+-------+----+------+----+---+----+----+------+----+-----+----+-----+----+----+-------+---+-----+----+-------+----+---+-------+----+----+------+-----+---+-------+----+-----+----+----+----+-----+-------+----+----+----+----+------+----+----+-----+----+-------+----+----+----+-------+----+----+-------+----+----+----+----+----+----+----+----+-----+----+----+------+----+----+----+----+----+----+-----+----+-----+-----+----+-----+------+------+-------+-------+------+----+----+-------+------+----+-------+-----+-------+-------+-------+-----+----+-------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+-

In [8]:
train_df = train_df.na.fill(value=NAN_VALUE)

In [9]:
train_df.select([F.sum(F.col(c).isNull().cast(T.IntegerType())).alias(c) for c in train_df.columns]).show()

+-----------+---+---+----+---+---+---+---+----+---+----+----+----+---+----+---+---+----+----+----+----+---+---+---+----+----+---+---+----+---+----+----+---+----+---+----+---+---+----+---+----+----+----+---+----+---+----+----+----+----+----+----+----+----+------+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+---+----+----+----+----+----+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+----+----+----+----+----+-----+-----+-----+-----+----+-----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+-----+-----+-----+-----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|customer_ID|S_2|P_2|D_

## Feature Engineering

In [10]:
def process_and_feature_engineer(df):
    all_cols = [c for c in list(df.columns) if not c in ['customer_ID','S_2']]
    cat_features = cat_features = ["B_30","B_38","D_114","D_116","D_117","D_120","D_126","D_63","D_64","D_66","D_68"]
    num_features = [col for col in all_cols if col not in cat_features]
    agg_funcs = ['mean', 'std', 'min', 'max', 'last']
    cat_funcs = ['count', 'last']
    
    
    
    # Aggregate Numerical Features
    num_agg_funcs = {}
    test_num_agg = spark.createDataFrame(data=spark.sparkContext.emptyRDD(),
                                         schema=T.StructType([T.StructField("customer_ID",
                                                              T.StringType(),
                                                              True)]))

    for func in agg_funcs:
        for feat in num_features:
            num_agg_funcs[feat] = func
        
        temp_df = df.groupBy("customer_ID").agg(num_agg_funcs)
        test_num_agg = test_num_agg.join(temp_df, on="customer_ID", how="outer")
    
#     test_num_agg.show()
    
    # Aggregate Categorical Features
    cat_agg_funcs = {}
    test_cat_agg = spark.createDataFrame(data=spark.sparkContext.emptyRDD(),
                                         schema=T.StructType([T.StructField("customer_ID",
                                                              T.StringType(),
                                                              True)]))
    for func in cat_funcs:
        for feat in cat_features:
            cat_agg_funcs[feat] = func
        temp_df = df.groupBy("customer_ID").agg(cat_agg_funcs)
        test_cat_agg = test_cat_agg.join(temp_df, on="customer_ID", how="outer")
    
    # Add in unique values per customer_id
    nunique = [F.countDistinct(c).alias("nqunique({})".format(c)) for c in cat_features]
    temp_df = df.groupBy("customer_ID").agg(*nunique)
    test_cat_agg = test_cat_agg.join(temp_df, on="customer_ID", how="outer")
        
    df = test_num_agg.join(test_cat_agg, on="customer_ID", how="outer")
    del temp_df, test_num_agg, test_cat_agg
    print("Shape After Feature Engineering", (df.count(), len(df.columns)))
    df.printSchema()
    return df
            
    

In [11]:
train = process_and_feature_engineer(train_df)

Shape After Feature Engineering (458913, 919)
root
 |-- customer_ID: string (nullable = true)
 |-- avg(D_74): double (nullable = true)
 |-- avg(R_15): double (nullable = true)
 |-- avg(D_55): double (nullable = true)
 |-- avg(B_16): double (nullable = true)
 |-- avg(S_5): double (nullable = true)
 |-- avg(B_22): double (nullable = true)
 |-- avg(D_70): double (nullable = true)
 |-- avg(D_81): double (nullable = true)
 |-- avg(B_7): double (nullable = true)
 |-- avg(S_13): double (nullable = true)
 |-- avg(R_26): double (nullable = true)
 |-- avg(D_92): double (nullable = true)
 |-- avg(B_11): double (nullable = true)
 |-- avg(D_44): double (nullable = true)
 |-- avg(D_127): double (nullable = true)
 |-- avg(B_27): double (nullable = true)
 |-- avg(D_138): double (nullable = true)
 |-- avg(D_104): double (nullable = true)
 |-- avg(D_109): double (nullable = true)
 |-- avg(B_6): double (nullable = true)
 |-- avg(R_5): double (nullable = true)
 |-- avg(B_33): double (nullable = true)
 |--

# Load Labels

In [12]:
targets = spark.read.csv(config["TRAIN_LABELS_PATH"], header=True, inferSchema=True)

In [13]:
targets.printSchema()

root
 |-- customer_ID: string (nullable = true)
 |-- target: integer (nullable = true)



In [14]:
targets.show()

+--------------------+------+
|         customer_ID|target|
+--------------------+------+
|0000099d6bd597052...|     0|
|00000fd6641609c6e...|     0|
|00001b22f846c82c5...|     0|
|000041bdba6ecadd8...|     0|
|00007889e4fcd2614...|     0|
|000084e5023181993...|     0|
|000098081fde4fd64...|     0|
|0000d17a1447b25a0...|     0|
|0000f99513770170a...|     1|
|00013181a0c5fc8f1...|     1|
|0001337ded4e1c253...|     1|
|00013c6e1cec7c21b...|     1|
|0001812036f155833...|     1|
|00018dd4932409baf...|     0|
|000198b3dc70edd65...|     0|
|000201146e53cacdd...|     0|
|0002d381bdd8048d7...|     0|
|0002e335892f7998f...|     1|
|00031e8be98bc3411...|     0|
|000333075fb8ec6d5...|     1|
+--------------------+------+
only showing top 20 rows



In [15]:
train = train.join(targets, on="customer_ID", how="outer")

In [16]:
# train.show()
# crashes the development server

In [17]:
train.printSchema()

root
 |-- customer_ID: string (nullable = true)
 |-- avg(D_74): double (nullable = true)
 |-- avg(R_15): double (nullable = true)
 |-- avg(D_55): double (nullable = true)
 |-- avg(B_16): double (nullable = true)
 |-- avg(S_5): double (nullable = true)
 |-- avg(B_22): double (nullable = true)
 |-- avg(D_70): double (nullable = true)
 |-- avg(D_81): double (nullable = true)
 |-- avg(B_7): double (nullable = true)
 |-- avg(S_13): double (nullable = true)
 |-- avg(R_26): double (nullable = true)
 |-- avg(D_92): double (nullable = true)
 |-- avg(B_11): double (nullable = true)
 |-- avg(D_44): double (nullable = true)
 |-- avg(D_127): double (nullable = true)
 |-- avg(B_27): double (nullable = true)
 |-- avg(D_138): double (nullable = true)
 |-- avg(D_104): double (nullable = true)
 |-- avg(D_109): double (nullable = true)
 |-- avg(B_6): double (nullable = true)
 |-- avg(R_5): double (nullable = true)
 |-- avg(B_33): double (nullable = true)
 |-- avg(S_8): double (nullable = true)
 |-- avg(S

# Train XGBoost Model

## Import Libraries

In [18]:
from sklearn.model_selection import KFold
import xgboost as xgb
print('XGB Version',xgb.__version__)

# XGB MODEL PARAMETERS
xgb_parms = { 
    'max_depth':4, 
    'learning_rate':0.05, 
    'subsample':0.8,
    'colsample_bytree':0.6, 
    'eval_metric':'logloss',
    'objective':'binary:logistic',
    'tree_method':'gpu_hist',
    'predictor':'gpu_predictor',
    'random_state':SEED
}

XGB Version 1.6.1


In [19]:
def amex_metric_mod(y_true, y_pred):
    labels     = np.transpose(np.array([y_true, y_pred]))
    labels     = labels[labels[:, 1].argsort()[::-1]]
    weights    = np.where(labels[:,0]==0, 20, 1)
    cut_vals   = labels[np.cumsum(weights) <= int(0.04 * np.sum(weights))]
    top_four   = np.sum(cut_vals[:,0]) / np.sum(labels[:,0])

    gini = [0,0]
    for i in [1,0]:
        labels         = np.transpose(np.array([y_true, y_pred]))
        labels         = labels[labels[:, i].argsort()[::-1]]
        weight         = np.where(labels[:,0]==0, 20, 1)
        weight_random  = np.cumsum(weight / np.sum(weight))
        total_pos      = np.sum(labels[:, 0] *  weight)
        cum_pos_found  = np.cumsum(labels[:, 0] * weight)
        lorentz        = cum_pos_found / total_pos
        gini[i]        = np.sum((lorentz - weight_random) * weight)

    return 0.5 * (gini[1]/gini[0] + top_four)

In [20]:
importances = []
oof = []
train = train.toPandas()
TRAIN_SUBSAMPLE = 1.0

skf = KFold(n_splits=FOLDS, shuffle=True, random_state=SEED)
for fold, (train_idx, valid_idx) in enumerate(skf.split(
            train, train.target)):
    
    if TRAIN_SUBSAMPLE<1.0:
        np.random.seed(SEED)
        train_idx = np.random.choice(train_idx, 
                       int(len(train_idx)*TRAIN_SUBSAMPLE), replace=False)
        np.random.seed(None)
    
    print('#'*25)
    print('### Fold',fold+1)
    print('### Train size',len(train_idx),'Valid size',len(valid_idx))
    print(f'### Training with {int(TRAIN_SUBSAMPLE*100)}% fold data...')
    print('#'*25)

Py4JJavaError: An error occurred while calling o4399.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 139.0 failed 1 times, most recent failure: Lost task 9.0 in stage 139.0 (TID 1855) (DESKTOP-EI40B8Q executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Unknown Source)
	at java.io.ByteArrayOutputStream.grow(Unknown Source)
	at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
	at java.io.ByteArrayOutputStream.write(Unknown Source)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.io.DataOutputStream.write(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:542)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:367)
	at org.apache.spark.sql.execution.SparkPlan$$Lambda$2523/487693113.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.RDD$$Lambda$2520/1580711074.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2482/137659635.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:340)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:368)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:340)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3688)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3685)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Unknown Source)
	at java.io.ByteArrayOutputStream.grow(Unknown Source)
	at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
	at java.io.ByteArrayOutputStream.write(Unknown Source)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.io.DataOutputStream.write(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:542)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:367)
	at org.apache.spark.sql.execution.SparkPlan$$Lambda$2523/487693113.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.RDD$$Lambda$2520/1580711074.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2482/137659635.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
