In [1]:
!pip install pyspark



In [4]:
import numpy 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, Imputer
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize Spark session
spark = SparkSession.builder.appName("loan_acceptance").config("spark.driver.memory", "9g").getOrCreate()

# Read CSV file
df = spark.read.csv("./../../Downloads/loan_data.csv", header=True)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/12 10:44:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
schema = df.schema
for field in schema.fields:
    print(f"Column: {field.name}, Type: {field.dataType}")

Column: person_age, Type: StringType()
Column: person_gender, Type: StringType()
Column: person_education, Type: StringType()
Column: person_income, Type: StringType()
Column: person_emp_exp, Type: StringType()
Column: person_home_ownership, Type: StringType()
Column: loan_amnt, Type: StringType()
Column: loan_intent, Type: StringType()
Column: loan_int_rate, Type: StringType()
Column: loan_percent_income, Type: StringType()
Column: cb_person_cred_hist_length, Type: StringType()
Column: credit_score, Type: StringType()
Column: previous_loan_defaults_on_file, Type: StringType()
Column: loan_status, Type: StringType()


In [6]:
df.show()

+----------+-------------+----------------+-------------+--------------+---------------------+---------+-----------------+-------------+-------------------+--------------------------+------------+------------------------------+-----------+
|person_age|person_gender|person_education|person_income|person_emp_exp|person_home_ownership|loan_amnt|      loan_intent|loan_int_rate|loan_percent_income|cb_person_cred_hist_length|credit_score|previous_loan_defaults_on_file|loan_status|
+----------+-------------+----------------+-------------+--------------+---------------------+---------+-----------------+-------------+-------------------+--------------------------+------------+------------------------------+-----------+
|      22.0|       female|          Master|      71948.0|             0|                 RENT|  35000.0|         PERSONAL|        16.02|               0.49|                       3.0|         561|                            No|          1|
|      21.0|       female|     High Scho

In [11]:

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
# impute rows with missing values
df = df.fillna(0)

# List of categorical columns to be encoded
categorical_columns = ["person_education", "person_home_ownership", "loan_intent", "previous_loan_defaults_on_file", "loan_status", "loan_amnt"]

# List of numerical columns
numerical_columns = ["person_age", "person_income", "person_emp_exp", "person_gender", "loan_int_rate", "loan_percent_income", "cb_person_cred_hist_length"]
df = df.withColumn("credit_score", col("credit_score").cast("float"))

for column in numerical_columns:
    df = df.withColumn(column, col(column).cast("float"))
    
# Create a StringIndexer and OneHotEncoder for each categorical column
indexers = [StringIndexer(inputCol=col, outputCol=col + "_indexed") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + "_indexed", outputCol=col + "_encoded") for col in categorical_columns]

# Assemble all feature columns into a single vector
encoded_feature_columns = [col + "_encoded" for col in categorical_columns]
feature_columns = encoded_feature_columns + numerical_columns
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Create a StringIndexer for the target column
label_indexer = StringIndexer(inputCol="credit_score", outputCol="label", handleInvalid="keep")

# Combine all stages into a pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, label_indexer])

# Fit and transform the data
df_preprocessed = pipeline.fit(df).transform(df)

# Split the data into training and test sets
train_df, test_df = df_preprocessed.randomSplit([0.8, 0.2], seed=42)

# Initialize the Random Forest classifier
rf = RandomForestRegressor(labelCol="label", featuresCol="features", numTrees=100)

# Train the model
rf_model = rf.fit(train_df)

# Make predictions on the test set
predictions = rf_model.transform(test_df)

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="credit_score", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

"""
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

print(f"Test set accuracy: {accuracy}")
print(f"Test set precision: {precision}")
print(f"Test set recall: {recall}")
print(f"Test set F1-score: {f1}")
"""

25/02/12 10:56:31 WARN DAGScheduler: Broadcasting large task binary with size 1594.0 KiB
25/02/12 10:56:54 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
25/02/12 10:57:14 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
25/02/12 10:57:36 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB
25/02/12 10:58:00 WARN DAGScheduler: Broadcasting large task binary with size 6.4 MiB
[Stage 94:>                                                         (0 + 1) / 1]

Root Mean Squared Error (RMSE) on test data = 565.464


                                                                                

'\n# Evaluate the model\nevaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")\naccuracy = evaluator.evaluate(predictions)\nprecision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})\nrecall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})\nf1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})\n\nprint(f"Test set accuracy: {accuracy}")\nprint(f"Test set precision: {precision}")\nprint(f"Test set recall: {recall}")\nprint(f"Test set F1-score: {f1}")\n'

In [12]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define the parameter grid
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100, 150])
             .addGrid(rf.maxDepth, [5, 10, 15])
             .build())

# Define the cross-validator
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Perform cross-validation
cv_model = crossval.fit(train_df)
cv_predictions = cv_model.transform(test_df)
cv_accuracy = evaluator.evaluate(cv_predictions)

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="credit_score", metricName="rmse")
rmse = evaluator.evaluate(cv_predictions)
print(f"Cross-validated accuracy: {rmse}")

25/02/12 10:58:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/02/12 10:58:42 WARN DAGScheduler: Broadcasting large task binary with size 1362.4 KiB
25/02/12 10:58:52 WARN DAGScheduler: Broadcasting large task binary with size 1632.0 KiB
25/02/12 10:59:00 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
25/02/12 10:59:08 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
25/02/12 10:59:18 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
25/02/12 10:59:30 WARN DAGScheduler: Broadcasting large task binary with size 1362.4 KiB
25/02/12 10:59:40 WARN DAGScheduler: Broadcasting large task binary with size 1632.0 KiB
25/02/12 10:59:48 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
25/02/12 10:59:56 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
25/02/12 11:00:05 WARN DA

Cross-validated accuracy: 565.4641249029512


                                                                                

In [None]:


evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

print(f"Test set accuracy: {accuracy}")
print(f"Test set precision: {precision}")
print(f"Test set recall: {recall}")
print(f"Test set F1-score: {f1}")

[Stage 1251:>                                                       (0 + 1) / 1]

Test set accuracy: 0.5538153948683772
Test set precision: 0.30671149159321653
Test set recall: 0.5538153948683772
Test set F1-score: 0.39478498231663856


                                                                                

Column: person_age, Type: FloatType()
Column: person_gender, Type: FloatType()
Column: person_education, Type: StringType()
Column: person_income, Type: FloatType()
Column: person_emp_exp, Type: FloatType()
Column: person_home_ownership, Type: StringType()
Column: loan_amnt, Type: StringType()
Column: loan_intent, Type: StringType()
Column: loan_int_rate, Type: FloatType()
Column: loan_percent_income, Type: FloatType()
Column: cb_person_cred_hist_length, Type: FloatType()
Column: credit_score, Type: FloatType()
Column: previous_loan_defaults_on_file, Type: StringType()
Column: loan_status, Type: StringType()


In [None]:
#Create a spark pipeline that imputes missing values, encodes the string columns, and lables the response columns properly and apply it to the data


# impute rows with missing values
df = df.fillna(0)

# List of categorical columns to be encoded
categorical_columns = ["person_education", "person_home_ownership", "loan_intent", "previous_loan_defaults_on_file", "person_gender"]

# List of numerical columns
numerical_columns = ["person_age", "person_income", "person_emp_exp", "loan_amnt", "loan_int_rate", "loan_percent_income", "cb_person_cred_hist_length", "credit_score"]

for column in numerical_columns:
    df = df.withColumn(column, col(column).cast("float"))
    
# Create a StringIndexer and OneHotEncoder for each categorical column
indexers = [StringIndexer(inputCol=col, outputCol=col + "_indexed") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + "_indexed", outputCol=col + "_encoded") for col in categorical_columns]

# Assemble all feature columns into a single vector
encoded_feature_columns = [col + "_encoded" for col in categorical_columns]
feature_columns = encoded_feature_columns + numerical_columns
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Create a StringIndexer for the target column
label_indexer = StringIndexer(inputCol="loan_status", outputCol="label")

# Combine all stages into a pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, label_indexer])

# Fit and transform the data
df_preprocessed = pipeline.fit(df).transform(df)

In [None]:


# Split the data into training and test sets
train_df, test_df = df_preprocessed.randomSplit([0.8, 0.2], seed=42)

# Initialize the Random Forest classifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

# Train the model
rf_model = rf.fit(train_df)

# Make predictions on the test set
predictions = rf_model.transform(test_df)

# Evaluate the model and print cross validated accuracy, precision and recall


                                                                                

Test set accuracy: 0.9001443963123403
Test set precision: 0.9034837644294712
Test set recall: 0.9001443963123403
Test set F1-score: 0.8917935923311843


In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define the parameter grid
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100, 150])
             .addGrid(rf.maxDepth, [5, 10, 15])
             .build())

# Perform a cross validated training of an RF model using the grid defined above


# Print cross validated accuracy, precision and recall
print(f"Cross-validated accuracy: {cv_accuracy}")
print(f"CV set precision: {precision}")
print(f"CV set recall: {recall}")
print(f"CV set F1-score: {f1}")

25/02/11 14:56:53 WARN DAGScheduler: Broadcasting large task binary with size 1302.7 KiB
25/02/11 14:56:54 WARN DAGScheduler: Broadcasting large task binary with size 1967.3 KiB
25/02/11 14:56:55 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
25/02/11 14:56:57 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
25/02/11 14:57:02 WARN DAGScheduler: Broadcasting large task binary with size 1302.6 KiB
25/02/11 14:57:03 WARN DAGScheduler: Broadcasting large task binary with size 1967.3 KiB
25/02/11 14:57:04 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
25/02/11 14:57:06 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/02/11 14:57:07 WARN DAGScheduler: Broadcasting large task binary with size 5.6 MiB
25/02/11 14:57:10 WARN DAGScheduler: Broadcasting large task binary with size 7.5 MiB
25/02/11 14:57:12 WARN DAGScheduler: Broadcasting large task binary with size 9.6 MiB
25/02/11 14:57:15 WARN DAGScheduler: Broad

Cross-validated accuracy: 0.9198045096079085
CV set precision: 0.9187970233355921
CV set recall: 0.9198045096079084
CV set F1-score: 0.9166129810891496


                                                                                

In [None]:


evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

print(f"Test set accuracy: {accuracy}")
print(f"Test set precision: {precision}")
print(f"Test set recall: {recall}")
print(f"Test set F1-score: {f1}")

                                                                                

Test set accuracy: 0.9001443963123403
Test set precision: 0.9034837644294712
Test set recall: 0.9001443963123403
Test set F1-score: 0.8917935923311843


25/02/11 17:02:27 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 3513357 ms exceeds timeout 120000 ms
25/02/11 17:02:27 WARN SparkContext: Killing executors is not supported by current scheduler.
25/02/11 17:13:53 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

- import data and view the table
- create comments detailing what could be done
- link to proper documentation of apache spark
- also have a brnach with code to properly execute it all


- create spark session
- import data
- impute missing values
- find a way to handle the string columns
- create a pipeline
- fit the data with the pipeline
- Train/test split
- Train model and validate with test data
- Try to find better hyperparametres

One problem is that the basic RF model is already very good at predicting the response, perhaps they should get a harder problem?
possibly need to predict another variable then. Or possible the first prediction should be loan status and the other, more difficult one could be education, etc

finding the credit score should be reasonable, the goal should then be to present the lowest RMSE without using duplicates and other data leaks

hyperpara search er ganske ambisiøst gitt at workshop varer i maks noen timer



ok, so I think "holding their hands" when trying to predict the "gender" response, and then only giving them vague instructions for the rest. Should also show them where to find pyspark docs and such
Should also heavily discourage use of pandas

all this has to be rewritten to databricks notebook in advance of the workshop
