In [None]:
import sys
sys.path.insert(0, '/usr/local/spark/jars/xgboost4j-spark_3.0-1.3.0-0.1.0.jar')

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, IntegerType, StructField, StructType
from time import time

__<span style="color:red">Exercise1:</span>__: Create a SparkSession builder with following configuration parameters:
    
    spark.executor.cores = 1
    driver-memory = 10G

In [None]:
reader = spark.read
label = 'delinquency_12'
schema = StructType([
    StructField('orig_channel', FloatType()),
    StructField('first_home_buyer', FloatType()),
    StructField('loan_purpose', FloatType()),
    StructField('property_type', FloatType()),
    StructField('occupancy_status', FloatType()),
    StructField('property_state', FloatType()),
    StructField('product_type', FloatType()),
    StructField('relocation_mortgage_indicator', FloatType()),
    StructField('seller_name', FloatType()),
    StructField('mod_flag', FloatType()),
    StructField('orig_interest_rate', FloatType()),
    StructField('orig_upb', IntegerType()),
    StructField('orig_loan_term', IntegerType()),
    StructField('orig_ltv', FloatType()),
    StructField('orig_cltv', FloatType()),
    StructField('num_borrowers', FloatType()),
    StructField('dti', FloatType()),
    StructField('borrower_credit_score', FloatType()),
    StructField('num_units', IntegerType()),
    StructField('zip', IntegerType()),
    StructField('mortgage_insurance_percent', FloatType()),
    StructField('current_loan_delinquency_status', IntegerType()),
    StructField('current_actual_upb', FloatType()),
    StructField('interest_rate', FloatType()),
    StructField('loan_age', FloatType()),
    StructField('msa', FloatType()),
    StructField('non_interest_bearing_upb', FloatType()),
    StructField(label, IntegerType()),
])
features = [ x.name for x in schema if x.name != label ]

train_data = reader.schema(schema).option('header', True).csv('/shared-data/datasets/mortgage-small/train')
trans_data = reader.schema(schema).option('header', True).csv('/shared-data/datasets/mortgage-small/trainWithEval')


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
def vectorize(data_frame):
    to_floats = [ col(x.name).cast(FloatType()) for x in data_frame.schema ]
    return (VectorAssembler()
        .setInputCols(features)
        .setOutputCol('features')
        .transform(data_frame.select(to_floats))
        .select(col('features'), col(label)))

train_data = vectorize(train_data)
trans_data = vectorize(trans_data)

In [None]:
train_data.show(5)

In [None]:
from ml.dmlc.xgboost4j.scala.spark import XGBoostClassificationModel, XGBoostClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, IntegerType, StructField, StructType
from time import time

__<span style="color:red">Exercise2:</span>__: Create a XGBoostClassifier with the following parameters:
    
    params = { 
    'eta': 0.1,
    'gamma': 0.1,
    'missing': 0.0,
    'maxDepth': 10, 
    'maxLeaves': 256,
    'objective':'binary:logistic',
    'growPolicy': 'depthwise',
    'minChildWeight': 30.0,
    'lambda_': 1.0,
    'scalePosWeight': 2.0,
    'subsample': 1.0,
    'nthread': 1,
    'numRound': 100,
    'numWorkers': 1,
}

In [None]:
def with_benchmark(phrase, action):
    start = time()
    result = action()
    end = time()
    print('{} takes {} seconds'.format(phrase, round(end - start, 2)))
    return result
model = with_benchmark('Training', lambda: classifier.fit(train_data))

__<span style="color:red">Exercise3:</span>__: Save the Xgboost model to a local path.

__<span style="color:red">Exercise4:</span>__: Load the XgBoost Model

In [None]:
def transform():
    result = loaded_model.transform(trans_data).cache()
    result.foreachPartition(lambda _: None)
    return result
result = with_benchmark('Transformation', transform)
result.select(label, 'rawPrediction', 'probability', 'prediction').show(5)

In [None]:
accuracy = with_benchmark(
    'Evaluation',
    lambda: MulticlassClassificationEvaluator().setLabelCol(label).evaluate(result))
print('Accuracy is ' + str(accuracy))

__<span style="color:red">Exercise5:</span>__: Carry out the training using GPU by adding ('treeMethod': 'gpu_hist') into the parameters.
Compare the time taken for training, transformation and evaluation.

In [None]:
spark.stop()