In [3]:
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr connect --cluster-id j-104TXHDV2M8Z9 --auth-type None --language python  

Successfully read emr cluster(j-104TXHDV2M8Z9) details
Initiating EMR connection..
Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
8,application_1682640635824_0009,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
{"namespace": "sagemaker-analytics", "cluster_id": "j-104TXHDV2M8Z9", "error_message": null, "success": true, "service": "emr", "operation": "connect"}


# Training SageMaker XGBoost Model on Delta Lake (Loan Risk Data)

<img src="https://d2908q01vomqb2.cloudfront.net/77de68daecd823babbb58edb1c8e14d7106e83bb/2018/04/24/SageMaker-300x150.jpg" width=200/>

This is a companion notebook to provide a SageMaker Model Training on Delta Lake example against the Lending Club data.
* This notebook has been tested with *EMR 6.6.0, SparkMagic Kernel*

In [50]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
pd.set_option('display.max_columns', 100) #replace n with the number of columns you want to see completely
pd.set_option('display.max_rows', 1000) #replace n with the numbe

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Configure location of raw parquet file
FILENAME = "s3://sagemaker-us-east-2-850751315356/vedjain-deltalake-2022/lending_club/full-loans-pq/part-00000-bb6c5c3e-bcce-4026-a4cf-9dda63dcf794-c000.snappy.parquet"

# Read loanstats_2012_2017.parquet
data = spark.read.parquet(FILENAME)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
data.cache()

In [10]:
print(data.limit(5).toPandas())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

     id member_id  loan_amnt  funded_amnt  funded_amnt_inv        term  \
0  None      None    35000.0        35000          35000.0   36 months   
1  None      None     8000.0         8000           8000.0   36 months   
2  None      None     5000.0         5000           5000.0   36 months   
3  None      None    10000.0        10000          10000.0   36 months   
4  None      None    24000.0        24000          24000.0   36 months   

  int_rate  installment grade sub_grade             emp_title emp_length  \
0   17.27%      1252.56     D        D2                 Owner  10+ years   
1   18.25%       290.23     D        D3              Manager     9 years   
2    6.97%       154.32     A        A3                  None        n/a   
3    9.75%       321.50     B        B3           ELECTRICIAN  10+ years   
4    9.75%       771.60     B        B3  C&C MACHINE OPERATOR    5 years   

  home_ownership  annual_inc verification_status         loan_status  \
0           RENT    107000

In [11]:
data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amnt: float (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- url: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: float (nullable = true)
 |-- delinq_2yrs: float

#### Dataset details can be found here: https://www.kaggle.com/datasets/wordsforthewise/lending-club

### Munge Data - create feature columns

In [12]:
from pyspark.sql.functions import *

print("------------------------------------------------------------------------------------------------")
print("Create bad loan label, this will include charged off, defaulted, and late repayments on loans...")
data = data.filter(data.loan_status.isin(["Default", "Charged Off", "Fully Paid"]))\
                       .withColumn("bad_loan", (~(data.loan_status == "Fully Paid")).cast("string"))

print("------------------------------------------------------------------------------------------------")
print("Turning string interest rate and revoling util columns into numeric columns...")
data = data.withColumn('int_rate', regexp_replace('int_rate', '%', '').cast('float')) \
                       .withColumn('revol_util', regexp_replace('revol_util', '%', '').cast('float')) \
                       .withColumn('issue_year',  substring(data.issue_d, 5, 4).cast('double') ) \
                       .withColumn('earliest_year', substring(data.earliest_cr_line, 5, 4).cast('double'))
data = data.withColumn('credit_length_in_years', (data.issue_year - data.earliest_year))


print("------------------------------------------------------------------------------------------------")
print("Converting emp_length column into numeric...")
data = data.withColumn('emp_length', trim(regexp_replace(data.emp_length, "([ ]*+[a-zA-Z].*)|(n/a)", "") ))
data = data.withColumn('emp_length', trim(regexp_replace(data.emp_length, "< 1", "0") ))
data = data.withColumn('emp_length', trim(regexp_replace(data.emp_length, "10\\+", "10") ).cast('float'))

print("------------------------------------------------------------------------------------------------")
print("Map multiple levels into one factor level for verification_status...")
data = data.withColumn('verification_status', trim(regexp_replace(data.verification_status, 'Source Verified', 'Verified')))

print("------------------------------------------------------------------------------------------------")
print("Calculate the total amount of money earned or lost per loan...")
data = data.withColumn('net', round( data.total_pymnt - data.loan_amnt, 2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

------------------------------------------------------------------------------------------------
Create bad loan label, this will include charged off, defaulted, and late repayments on loans...
------------------------------------------------------------------------------------------------
Turning string interest rate and revoling util columns into numeric columns...
------------------------------------------------------------------------------------------------
Converting emp_length column into numeric...
------------------------------------------------------------------------------------------------
Map multiple levels into one factor level for verification_status...
------------------------------------------------------------------------------------------------
Calculate the total amount of money earned or lost per loan...

In [13]:
print(data.select("net","verification_status","int_rate", "revol_util", "issue_year", "earliest_year", "bad_loan", "credit_length_in_years", "emp_length").limit(10).toPandas())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

        net verification_status  int_rate  revol_util  issue_year  \
0     98.34        Not Verified      5.32   27.900000      2016.0   
1    263.31            Verified      9.75   19.400000      2016.0   
2    361.74        Not Verified      5.32   23.900000      2016.0   
3   1088.67            Verified     12.99   75.400002      2016.0   
4 -20306.74            Verified     21.18   87.500000      2016.0   
5    330.35            Verified      7.39   59.099998      2016.0   
6 -14837.19            Verified     15.31    7.700000      2016.0   
7   2405.39            Verified     17.27   72.000000      2016.0   
8    632.18            Verified      9.16   28.799999      2016.0   
9   1044.92        Not Verified     15.31   97.199997      2016.0   

   earliest_year bad_loan  credit_length_in_years  emp_length  
0         2000.0    false                    16.0         8.0  
1         2010.0    false                     6.0         1.0  
2         1989.0    false                    27.

### Set Response & Predictor Variables

In [44]:
print("------------------------------------------------------------------------------------------------")
print("Setting variables to predict bad loans")
myY = "bad_loan"
categoricals = ["term", "home_ownership", "purpose", "addr_state",
                "verification_status","application_type"]
numerics = ["loan_amnt","emp_length", "annual_inc","dti",
            "delinq_2yrs","revol_util","total_acc",
            "credit_length_in_years"]
myX = categoricals + numerics

loan_stats = data.select(myX + [myY, "int_rate", "net", "issue_year"])
loan_stats.createOrReplaceTempView("loanStatsView")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

------------------------------------------------------------------------------------------------
Setting variables to predict bad loans

#### You can use SparkSQL queries using %%sql from the notebook and save results to a local DataFrame. This allows for a quick data exploration. The maximum rows returned by default is 2,500. You can set the maximum rows by using the -n argument.

In [15]:
%%sql -o loan_stats
select * from loanStatsView

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

#### You can then access and explore Spark dataframe locally

In [17]:
%%local 
loan_stats.head(10)

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [48]:
loan_stats.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Index(['bad_loan', 'term', 'home_ownership', 'purpose', 'addr_state',
       'verification_status', 'application_type', 'loan_amnt', 'emp_length',
       'annual_inc', 'dti', 'delinq_2yrs', 'revol_util', 'total_acc',
       'credit_length_in_years', 'int_rate', 'net', 'issue_year'],
      dtype='object')

### Create Train / Validation Split for SageMaker XGBoost Model

In [92]:
# SageMaker XGBoost has the convention of label in the first column
loan_stats = loan_stats.toPandas()
label = loan_stats.pop('bad_loan')
loan_stats.insert(0, 'bad_loan', label)
  
loan_stats[categoricals] = loan_stats[categoricals].apply(lambda x: x.astype("category").cat.codes)
loan_stats['bad_loan'] = loan_stats['bad_loan'].astype("category").cat.codes

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [93]:
# # Split the downloaded loan_stats into train/test loan_statsframes
train, valid = np.split(loan_stats.sample(frac=1), [int(0.8 * len(loan_stats))])

print("training data size = %d | validation data size = %d" % (len(train), len(valid)))
train_df = spark.createDataFrame(train)
valid_df = spark.createDataFrame(valid)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

training data size = 526439 | validation data size = 131610

### Write Train / Validation data to S3

In [107]:
train_df.coalesce(1).write.mode("overwrite").parquet("s3://sagemaker-us-east-2-850751315356/vedjain-deltalake-2022/lending_club/sagemaker-xg-boost/train.parquet")
valid_df.coalesce(1).write.mode("overwrite").parquet("s3://sagemaker-us-east-2-850751315356/vedjain-deltalake-2022/lending_club/sagemaker-xg-boost/validation.parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Train SageMaker XGBoost Model

In [125]:
%%local 

import io
import os
import boto3
import sagemaker

role = sagemaker.get_execution_role()
region = boto3.Session().region_name

# S3 bucket for saving code and model artifacts.
# Feel free to specify a different bucket here if you wish.
bucket = sagemaker.Session().default_bucket()
prefix = "vedjain-deltalake-2022/lending_club/sagemaker-xg-boost"
# customize to your bucket where you have would like to store the data
bucket_path = "https://s3-{}.amazonaws.com/{}".format(region, bucket)
container = sagemaker.image_uris.retrieve("xgboost", "us-east-2", "1.7-1")

In [126]:
%%local
import time
from time import gmtime, strftime

client = boto3.client("sagemaker", region_name=region)
use_amt = True

training_job_name = "xgboost-parquet-loans-training-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("Training job", training_job_name)

# Ensure that the training and validation data folders generated above are reflected in the "InputDataConfig" parameter below.

create_training_params = {
    "AlgorithmSpecification": {"TrainingImage": container, "TrainingInputMode": "Pipe"},
    "RoleArn": role,
    "OutputDataConfig": {"S3OutputPath": f"{bucket_path}/{prefix}/single-xgboost"},
    "ResourceConfig": {"InstanceCount": 4, "InstanceType": "ml.m5.2xlarge", "VolumeSizeInGB": 20, "KeepAlivePeriodInSeconds":3600},
    "TrainingJobName": training_job_name,
    "HyperParameters": {
        "max_depth": "5",
        "eta": "0.2",
        "gamma": "4",
        "min_child_weight": "6",
        "subsample": "0.7",
        "objective": "reg:linear",
        "num_round": "10",
        "verbosity": "2",
    },
    "StoppingCondition": {"MaxRuntimeInSeconds": 3600},
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": f"{bucket_path}/{prefix}/train.parquet/",
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "ContentType": "application/x-parquet",
            "CompressionType": "None",
        },
        {
            "ChannelName": "validation",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": f"{bucket_path}/{prefix}/validation.parquet/",
                    "S3DataDistributionType": "FullyReplicated",
                }
            },
            "ContentType": "application/x-parquet",
            "CompressionType": "None",
        },
    ],
}

print(
    f"Creating a training job with name: {training_job_name}. It will take between 5 and 6 minutes to complete."
)
client.create_training_job(**create_training_params)

status = client.describe_training_job(TrainingJobName=training_job_name)["TrainingJobStatus"]
print(status)
while status != "Completed" and status != "Failed":
    time.sleep(60)
    status = client.describe_training_job(TrainingJobName=training_job_name)["TrainingJobStatus"]
    print(status)

Training job xgboost-parquet-loans-training-2023-05-02-20-00-11
Creating a training job with name: xgboost-parquet-loans-training-2023-05-02-20-00-11. It will take between 5 and 6 minutes to complete.
InProgress
InProgress
InProgress
Completed


## Register SageMaker Model to Model Group

In [136]:
%%local 

# Specify the model source
model_url = "s3://sagemaker-us-east-2-850751315356/vedjain-deltalake-2022/lending_club/sagemaker-xg-boost/single-xgboost/xgboost-parquet-loans-training-2023-05-02-20-00-11/output/model.tar.gz"
model_package_group_name = "LendingClub-BadLoans-BinaryClassification"
modelpackage_inference_specification =  {
    "InferenceSpecification": {
      "Containers": [
         {
            "Image": '257758044811.dkr.ecr.us-east-2.amazonaws.com/sagemaker-xgboost:1.7-1',
	    "ModelDataUrl": model_url
         }
      ],
      "SupportedContentTypes": [ "application/x-parquet" ],
      "SupportedResponseMIMETypes": [ "application/x-parquet" ],
   }
 }
create_model_package_input_dict = {
    "ModelPackageGroupName" : model_package_group_name,
    "ModelPackageDescription" : "Model to predict if the loan will turn out bad or good",
    "ModelApprovalStatus" : "PendingManualApproval"
}
create_model_package_input_dict.update(modelpackage_inference_specification)

In [137]:
%%local
create_model_package_response = client.create_model_package(**create_model_package_input_dict)
model_package_arn = create_model_package_response["ModelPackageArn"]
print('ModelPackage Version ARN : {}'.format(model_package_arn))

ModelPackage Version ARN : arn:aws:sagemaker:us-east-2:850751315356:model-package/lendingclub-badloans-binaryclassification/4
