In [1]:
import os, sys

os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/zulu-8.jdk/Contents/Home"
os.environ["SPARK_HOME"] = "/opt/spark"

# Remove wrong pyspark (Anaconda)
sys.path = [p for p in sys.path if "site-packages/pyspark" not in p]

# Add correct Spark PySpark
sys.path.insert(0, "/opt/spark/python")
sys.path.insert(1, "/opt/spark/python/lib/py4j-0.10.9.7-src.zip")

os.environ["PATH"] = (
    os.environ["JAVA_HOME"] + "/bin:" +
    os.environ["SPARK_HOME"] + "/bin:" +
    os.environ["PATH"]
)

print("Java:", os.environ["JAVA_HOME"])
print("Spark:", os.environ["SPARK_HOME"])

Java: /Library/Java/JavaVirtualMachines/zulu-8.jdk/Contents/Home
Spark: /opt/spark


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("LGD Prediction") \
    .master("local[*]") \
    .getOrCreate()

spark

25/12/02 13:43:07 WARN Utils: Your hostname, apple-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 10.0.12.191 instead (on interface en0)
25/12/02 13:43:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/02 13:43:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.conf.set("spark.hadoop.fs.defaultFS", "file:///")
spark.conf.set("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
spark.conf.set("spark.hadoop.fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")

In [4]:
data = spark.read.csv(
    "file:///Users/apple/Desktop/lgd_data.csv",
    header=True,
    inferSchema=True,
    sep=";"
)

In [5]:
from pyspark.sql.functions import regexp_replace, col

# columns that must be numeric
to_fix = [
    "Interest Rate (%)",
    "Income ($)",
    "Loan to Value Ratio (%)",
    "Debt to Income Ratio (%)",
    "LGD%"
]

for c in to_fix:
    data = data.withColumn(c, regexp_replace(col(c), ",", "."))
    data = data.withColumn(c, col(c).cast("double"))

In [6]:
numeric_cols = [
    "Exposure Amount ($)", "Credit Score", "Loan Term (Months)",
    "Employment History (Years)", "Previous Defaults"
]

for c in numeric_cols:
    data = data.withColumn(c, col(c).cast("double"))


In [7]:
data.show(10)
data.printSchema()

+-------------------+------------+------------------+-----------------+----------+-----------------------+------------------------+--------------------------+-----------------+--------------+------------------+---------------+------+-----------+
|Exposure Amount ($)|Credit Score|Loan Term (Months)|Interest Rate (%)|Income ($)|Loan to Value Ratio (%)|Debt to Income Ratio (%)|Employment History (Years)|Previous Defaults|Home Ownership|      Loan Purpose|      Loan Type|Region|       LGD%|
+-------------------+------------+------------------+-----------------+----------+-----------------------+------------------------+--------------------------+-----------------+--------------+------------------+---------------+------+-----------+
|           238203.0|       729.0|              36.0|             3.81|   47603.0|                   88.0|                    24.0|                       4.0|              2.0|          Rent|  Home Improvement|Adjustable Rate| South|0.009999874|
|           1700

## Feature engineering

### Mean defaults per credit score:

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

w_credit = Window.partitionBy("Credit Score")

data = data.withColumn(
    "Defaults_mean_by_Credit_Score",
    F.avg("Previous Defaults").over(w_credit)
)


### Mean income per region:

In [12]:
w_region = Window.partitionBy("Region")

data = data.withColumn(
    "Income_mean_by_Region",
    F.avg("Income ($)").over(w_region)
)

## Select final modeling columns

In [14]:
data = data.select(
    "Exposure Amount ($)",
    "Credit Score",
    "Loan Term (Months)",
    "Interest Rate (%)",
    "Income ($)",
    "Loan to Value Ratio (%)",
    "Debt to Income Ratio (%)",
    "Employment History (Years)",
    "Previous Defaults",
    "Home Ownership",
    "Loan Purpose",
    "Loan Type",
    "Region",
    "Defaults_mean_by_Credit_Score",
    "Income_mean_by_Region",
    "LGD%"
)

## Build Encoding + Assembler + Scaler

In [16]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

cat_cols = ["Home Ownership", "Loan Purpose", "Loan Type", "Region"]

num_cols = [
    "Exposure Amount ($)", "Credit Score", "Loan Term (Months)", "Interest Rate (%)",
    "Income ($)", "Loan to Value Ratio (%)", "Debt to Income Ratio (%)",
    "Employment History (Years)", "Previous Defaults",
    "Defaults_mean_by_Credit_Score", "Income_mean_by_Region"
]

indexers = [
    StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep")
    for c in cat_cols
]

encoder = OneHotEncoder(
    inputCols=[c+"_idx" for c in cat_cols],
    outputCols=[c+"_ohe" for c in cat_cols]
)

assembler = VectorAssembler(
    inputCols=num_cols + [c+"_ohe" for c in cat_cols],
    outputCol="features_raw"
)

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withMean=True,
    withStd=True
)


## Train/Test Split + Linear Regression

In [18]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

lr = LinearRegression(labelCol="LGD%", featuresCol="features")

pipeline = Pipeline(stages=indexers + [encoder, assembler, scaler, lr])

train, test = data.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train)

25/12/02 13:43:20 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/12/02 13:43:23 WARN Instrumentation: [2e90ea50] regParam is zero, which might cause numerical instability and overfitting.
25/12/02 13:43:23 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/12/02 13:43:23 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
25/12/02 13:43:23 WARN Instrumentation: [2e90ea50] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.


## Evaluate the model

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator

pred_train = model.transform(train)
pred_test = model.transform(test)

eval_mae = RegressionEvaluator(labelCol="LGD%", predictionCol="prediction", metricName="mae")
eval_rmse = RegressionEvaluator(labelCol="LGD%", predictionCol="prediction", metricName="rmse")
eval_r2 = RegressionEvaluator(labelCol="LGD%", predictionCol="prediction", metricName="r2")

print("Train MAE:", eval_mae.evaluate(pred_train))
print("Train RMSE:", eval_rmse.evaluate(pred_train))
print("Train R2:", eval_r2.evaluate(pred_train))

print("Test MAE:", eval_mae.evaluate(pred_test))
print("Test RMSE:", eval_rmse.evaluate(pred_test))
print("Test R2:", eval_r2.evaluate(pred_test))


Train MAE: 0.11797694189400731
Train RMSE: 0.14459267364310394
Train R2: 0.8197527977336208
Test MAE: 0.11768973682640281
Test RMSE: 0.14407408344804618
Test R2: 0.8242087204800287
