In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("Final") \
    .getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/08 11:29:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/08 11:29:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 55266)
Traceback (most recent call last):
  File "/Users/dhruvprajapati/.pyenv/versions/3.12.1/lib/python3.12/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/dhruvprajapati/.pyenv/versions/3.12.1/lib/python3.12/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
  File "/Users/dhruvprajapati/.pyenv/versions/3.12.1/lib/python3.12/socketserver.py", line 362, in finish_request
    self.RequestHandlerClas

In [3]:
spark

In [4]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [5]:
df = spark.read.parquet('./cleaned_dataset')

In [6]:
df.printSchema()

root
 |-- lei: string (nullable = true)
 |-- loan_type: integer (nullable = true)
 |-- loan_purpose: integer (nullable = true)
 |-- loan_amount: float (nullable = true)
 |-- interest_rate: float (nullable = true)
 |-- loan_term: integer (nullable = true)
 |-- action_taken: integer (nullable = true)
 |-- income: float (nullable = true)
 |-- applicant_age: string (nullable = true)
 |-- applicant_sex: integer (nullable = true)
 |-- applicant_credit_score_type: integer (nullable = true)
 |-- co_applicant_age: string (nullable = true)
 |-- co_applicant_credit_score_type: integer (nullable = true)
 |-- derived_msa_md: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- county_code: string (nullable = true)
 |-- property_value: float (nullable = true)
 |-- total_units: integer (nullable = true)
 |-- occupancy_type: integer (nullable = true)



In [7]:
len(df.columns)

19

In [8]:
df.count()

7427

### 4. Train Test split

In [9]:
train_df, test_df = df.randomSplit(weights=[0.8, 0.2], seed=100)

In [10]:
train_df.count()

5915

In [11]:
test_df.count()

1512

### 5. Feature transformation

In [12]:
from pyspark.sql.types import StringType

In [13]:
from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler, FeatureHasher
from pyspark.ml import Pipeline

In [14]:
# Define categorical and numerical features
categorical_features = [field.name for field in train_df.schema.fields if field.dataType is StringType()]
numerical_features = [field for field in train_df.columns if field not in categorical_features]

continuous_numerical_features = ["loan_amount", "interest_rate", "loan_term", "income", "property_value"]
discrete_numerical_features = [item for item in numerical_features if item not in continuous_numerical_features]

In [15]:
categorical_features.remove('county_code')

In [16]:
categorical_features

['lei', 'applicant_age', 'co_applicant_age', 'state_code']

In [17]:
continuous_numerical_features

['loan_amount', 'interest_rate', 'loan_term', 'income', 'property_value']

In [18]:
discrete_numerical_features

['loan_type',
 'loan_purpose',
 'action_taken',
 'applicant_sex',
 'applicant_credit_score_type',
 'co_applicant_credit_score_type',
 'derived_msa_md',
 'total_units',
 'occupancy_type']

In [19]:
cat_indexed_features = [f"{cat}_indexed" for cat in categorical_features]

In [20]:
# perform Label encoding on categorical features
labelEncoder = StringIndexer(inputCols=categorical_features, 
                           outputCols=cat_indexed_features, handleInvalid="skip")

In [21]:
# perform feature hashing on 'county_code' as a lot of distinct values
hasher = FeatureHasher(inputCols=["county_code"], outputCol="county_code_hashed", numFeatures=1000)

In [22]:
# perform Standard scaling on continuous numerical features
numAssembler = VectorAssembler(inputCols=continuous_numerical_features, 
                            outputCol="con_num_features")

numScaler = StandardScaler(inputCol="con_num_features", outputCol="con_num_features_scaled")

In [23]:
# assemble all the features together
featureAssembler = VectorAssembler(inputCols=["con_num_features_scaled", "county_code_hashed"]+discrete_numerical_features+cat_indexed_features, outputCol='features')

In [24]:
# make the Pipeline
transformPipeline = Pipeline(stages = [labelEncoder, hasher, numAssembler, numScaler, featureAssembler])

In [25]:
# train it
transformPipeModel = transformPipeline.fit(train_df)

                                                                                

In [26]:
transformPipeModel.write().overwrite().save('./models/transformPipeModel')

In [27]:
train_df = transformPipeModel.transform(train_df)

In [28]:
train_df

24/12/08 11:29:15 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


lei,loan_type,loan_purpose,loan_amount,interest_rate,loan_term,action_taken,income,applicant_age,applicant_sex,applicant_credit_score_type,co_applicant_age,co_applicant_credit_score_type,derived_msa_md,state_code,county_code,property_value,total_units,occupancy_type,lei_indexed,applicant_age_indexed,co_applicant_age_indexed,state_code_indexed,county_code_hashed,con_num_features,con_num_features_scaled,features
01J4SO3XTWZF4PP38209,1,1,465000.0,3.5,360,1,188.0,35-44,2,3,35-44,3,13820,AL,1117,495000.0,1,1,93.0,0.0,1.0,26.0,"(1000,[710],[1.0])","[465000.0,3.5,360...",[2.04958786384093...,"(1018,[0,1,2,3,4,..."
01J4SO3XTWZF4PP38209,1,2,5000.0,3.2904205,60,0,27.0,65-74,1,1,9999,10,32820,TN,47157,75000.0,1,1,93.0,4.0,0.0,20.0,"(1000,[611],[1.0])","[5000.0,3.2904205...",[0.02203857918108...,"(1018,[0,1,2,3,4,..."
01J4SO3XTWZF4PP38209,1,4,15000.0,0.98,120,1,58.0,55-64,2,7,65-74,9,25060,MS,28047,125000.0,2,1,93.0,3.0,6.0,36.0,"(1000,[149],[1.0])","[15000.0,0.980000...",[0.06611573754325...,"(1018,[0,1,2,3,4,..."
01J4SO3XTWZF4PP38209,1,31,25000.0,5.5,120,1,187.0,35-44,2,7,35-44,9,25620,MS,28035,405000.0,1,1,93.0,0.0,1.0,36.0,"(1000,[627],[1.0])","[25000.0,5.5,120....",[0.11019289590542...,"(1018,[0,1,2,3,4,..."
01J4SO3XTWZF4PP38209,1,31,45000.0,4.25,59,1,94.0,8888,4,9,9999,10,27140,MS,28049,75000.0,1,3,93.0,5.0,0.0,36.0,"(1000,[766],[1.0])","[45000.0,4.25,59....",[0.19834721262976...,"(1018,[0,1,2,3,4,..."
01J4SO3XTWZF4PP38209,1,31,205000.0,3.0,360,1,97.0,65-74,1,1,65-74,1,19300,AL,1003,375000.0,1,1,93.0,4.0,6.0,26.0,"(1000,[581],[1.0])","[205000.0,3.0,360...",[0.90358174642449...,"(1018,[0,1,2,3,4,..."
01J4SO3XTWZF4PP38209,1,32,105000.0,2.625,180,1,65.0,45-54,2,2,9999,10,25060,MS,28047,175000.0,1,1,93.0,1.0,0.0,36.0,"(1000,[149],[1.0])","[105000.0,2.625,1...",[0.46281016280279...,"(1018,[0,1,2,3,4,..."
01J4SO3XTWZF4PP38209,2,1,115000.0,3.99,360,0,94.0,8888,4,9,8888,9,99999,MS,28083,125000.0,1,1,93.0,5.0,4.0,36.0,"(1000,[791],[1.0])","[115000.0,3.99000...",[0.50688732116496...,"(1018,[0,1,2,3,4,..."
01J4SO3XTWZF4PP38209,2,1,315000.0,2.875,360,1,87.0,25-34,1,1,9999,10,32820,MS,28137,325000.0,1,1,93.0,2.0,0.0,36.0,"(1000,[744],[1.0])","[315000.0,2.875,3...",[1.38843048840837...,"(1018,[0,1,2,3,4,..."
0S8H5NJFLHEVJXVTQ413,1,32,285000.0,3.5,360,1,64.0,25-34,3,1,25-34,1,13380,WA,53073,355000.0,1,1,577.0,2.0,3.0,10.0,"(1000,[634],[1.0])","[285000.0,3.5,360...",[1.25619901332186...,"(1018,[0,1,2,3,4,..."


### 6. Models Prediction

In [29]:
from pyspark.ml.classification import LogisticRegression, LinearSVC, NaiveBayes, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, FMClassifier

In [30]:
# evaluate the model from 'y_true' and 'y_pred' 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [31]:
def evaluate_model(predictions, label_col='action_taken', prediction_col='prediction', raw_prediction_col='rawPrediction'):
    '''It returns classification evaluation metrics like accuracy, precision, f1, recall and roc'''
    
    # Initialize evaluators
    evaluator_accuracy = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=prediction_col, metricName='accuracy')
    evaluator_precision = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=prediction_col, metricName='weightedPrecision')
    evaluator_recall = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=prediction_col, metricName='weightedRecall')
    evaluator_f1 = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=prediction_col, metricName='f1')
    evaluator_roc = BinaryClassificationEvaluator(labelCol=label_col, rawPredictionCol=raw_prediction_col, metricName='areaUnderROC')

    # Calculate metrics
    accuracy = evaluator_accuracy.evaluate(predictions)
    precision = evaluator_precision.evaluate(predictions)
    recall = evaluator_recall.evaluate(predictions)
    f1_score = evaluator_f1.evaluate(predictions)
    roc_auc = evaluator_roc.evaluate(predictions)

    # Return all metrics as a dictionary
    metrics = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score,
        'roc_auc': roc_auc
    }

    return metrics

In [32]:
models = {
    'Logistic Regression': LogisticRegression(featuresCol='features', labelCol='action_taken'),
    'Support Vector Machine': LinearSVC(featuresCol='features', labelCol='action_taken'),
    # 'Naive Bayes': NaiveBayes(featuresCol='features', labelCol='action_taken'),
    'Factorization Machine': FMClassifier(featuresCol='features', labelCol='action_taken'),
    'Decision Tree': DecisionTreeClassifier(featuresCol='features', labelCol='action_taken', maxBins=2000),
    'Random Forest': RandomForestClassifier(featuresCol='features', labelCol='action_taken', maxBins=2000),
    'Gradient Boosting Trees': GBTClassifier(featuresCol='features', labelCol='action_taken', maxBins=2000),
}

In [33]:

for algo in models:
    print(f"========== {algo} ============")

    # Train the model
    model = models[algo]
    trained_model = model.fit(train_df)

    # Evaluate on Test data
    test_df_transformed = transformPipeModel.transform(test_df)
    test_predictions = trained_model.transform(test_df_transformed)

    results = evaluate_model(test_predictions)
    print("accuracy: {:.4f}".format(results['accuracy']))
    print("precison: {:.4f}".format(results['precision']))
    print("recall: {:.4f}".format(results['recall']))
    print("f1-score: {:.4f}".format(results['f1_score']))
    print("ROC: {:.4f}".format(results['roc_auc']))

    trained_model.write().overwrite().save(f'./models/{algo}')

    print('\n')



24/12/08 11:29:17 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/08 11:29:17 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


accuracy: 1.0000
precison: 1.0000
recall: 1.0000
f1-score: 1.0000
ROC: 1.0000


accuracy: 0.9993
precison: 0.9993
recall: 0.9993
f1-score: 0.9993
ROC: 0.9998


accuracy: 0.9151
precison: 0.9159
recall: 0.9151
f1-score: 0.9154
ROC: 0.9520


accuracy: 1.0000
precison: 1.0000
recall: 1.0000
f1-score: 1.0000
ROC: 1.0000


accuracy: 0.8244
precison: 0.8592
recall: 0.8244
f1-score: 0.8015
ROC: 0.9933


accuracy: 1.0000
precison: 1.0000
recall: 1.0000
f1-score: 1.0000
ROC: 1.0000




## Inference

In [34]:
from pyspark.ml import PipelineModel
from pyspark.ml.classification import LogisticRegressionModel

In [35]:
lr_model = LogisticRegressionModel.load("models/Logistic Regression")

In [36]:
lr_model

LogisticRegressionModel: uid=LogisticRegression_b96015d953db, numClasses=2, numFeatures=1018

In [37]:
transform_model = PipelineModel.load('models/transformPipeModel')

                                                                                

In [38]:
test_df.count()

1512

In [39]:
# Preprocess the test data and then make predictions 

test_transformed_df = transform_model.transform(test_df)
prediction_df = lr_model.transform(test_transformed_df)

In [40]:
prediction_df

lei,loan_type,loan_purpose,loan_amount,interest_rate,loan_term,action_taken,income,applicant_age,applicant_sex,applicant_credit_score_type,co_applicant_age,co_applicant_credit_score_type,derived_msa_md,state_code,county_code,property_value,total_units,occupancy_type,lei_indexed,applicant_age_indexed,co_applicant_age_indexed,state_code_indexed,county_code_hashed,con_num_features,con_num_features_scaled,features,rawPrediction,probability,prediction
01J4SO3XTWZF4PP38209,1,31,15000.0,4.95,60,1,94.0,8888,4,9,9999,10,25060,MS,28047,45000.0,1,3,93.0,5.0,0.0,36.0,"(1000,[149],[1.0])","[15000.0,4.949999...",[0.06611573754325...,"(1018,[0,1,2,3,4,...",[-19.836490256505...,[2.42729041100326...,1.0
0S8H5NJFLHEVJXVTQ413,1,31,435000.0,3.2904205,360,0,124.0,35-44,1,1,35-44,1,42644,WA,53061,655000.0,1,1,577.0,0.0,1.0,10.0,"(1000,[730],[1.0])","[435000.0,3.29042...",[1.91735638875442...,"(1018,[0,1,2,3,4,...",[19.5733510553066...,[0.99999999684207...,0.0
1IE8VN30JCEQV1H4R804,1,31,215000.0,3.2904205,360,0,109.0,55-64,1,2,9999,10,22744,FL,12011,355000.0,1,1,349.0,3.0,0.0,1.0,"(1000,[492],[1.0])","[215000.0,3.29042...",[0.94765890478666...,"(1018,[0,1,2,3,4,...",[20.5673297485328...,[0.99999999883124...,0.0
213800XR2TCBQJSF1X93,1,31,365000.0,2.875,360,1,102.0,35-44,1,2,9999,10,31084,CA,6037,625000.0,1,1,140.0,0.0,0.0,0.0,"(1000,[343],[1.0])","[365000.0,2.875,3...",[1.60881628021922...,"(1018,[0,1,2,3,4,...",[-20.934727314919...,[8.09400459542958...,1.0
2549006II76YXSS5XM65,1,31,205000.0,3.125,360,1,110.0,45-54,2,3,9999,10,47894,VA,51059,325000.0,1,1,204.0,1.0,0.0,12.0,"(1000,[734],[1.0])","[205000.0,3.125,3...",[0.90358174642449...,"(1018,[0,1,2,3,4,...",[-21.094882266760...,[6.89618708501421...,1.0
254900ACUWEGW702BR80,1,31,275000.0,3.0,360,1,78.0,45-54,2,3,9999,10,15804,NJ,34007,305000.0,1,1,353.0,1.0,0.0,11.0,"(1000,[839],[1.0])","[275000.0,3.0,360...",[1.21212185495969...,"(1018,[0,1,2,3,4,...",[-20.692577993815...,[1.03116356778725...,1.0
254900ACUWEGW702BR80,1,31,395000.0,3.0,360,1,332.0,35-44,1,1,>74,9,35614,NJ,34003,515000.0,1,1,353.0,0.0,7.0,11.0,"(1000,[806],[1.0])","[395000.0,3.0,360...",[1.74104775530573...,"(1018,[0,1,2,3,4,...",[-20.586412448677...,[1.14666002107074...,1.0
254900HA4DQWAE0W3342,1,1,155000.0,2.875,360,0,218.0,25-34,4,9,9999,9,99999,OK,40009,195000.0,1,1,9.0,2.0,0.0,33.0,"(1000,[379],[1.0])","[155000.0,2.875,3...",[0.68319595461364...,"(1018,[0,1,2,3,4,...",[19.2423420042787...,[0.99999999560299...,0.0
254900HA4DQWAE0W3342,1,1,215000.0,2.5,360,0,68.0,25-34,4,9,9999,9,99999,MN,27049,225000.0,1,1,9.0,2.0,0.0,16.0,"(1000,[366],[1.0])","[215000.0,2.5,360...",[0.94765890478666...,"(1018,[0,1,2,3,4,...",[19.7412173471917...,[0.99999999733008...,0.0
254900HA4DQWAE0W3342,1,1,215000.0,3.125,360,0,36.0,35-44,4,9,9999,9,99999,CA,6023,225000.0,1,1,9.0,0.0,0.0,0.0,"(1000,[807],[1.0])","[215000.0,3.125,3...",[0.94765890478666...,"(1018,[0,1,2,3,4,...",[20.2197389139190...,[0.99999999834545...,0.0


In [41]:
prediction_df.select('action_taken','prediction')

action_taken,prediction
1,1.0
0,0.0
0,0.0
1,1.0
1,1.0
1,1.0
1,1.0
0,0.0
0,0.0
0,0.0


In [42]:
test_point = test_df.limit(1)

In [43]:
test_point

lei,loan_type,loan_purpose,loan_amount,interest_rate,loan_term,action_taken,income,applicant_age,applicant_sex,applicant_credit_score_type,co_applicant_age,co_applicant_credit_score_type,derived_msa_md,state_code,county_code,property_value,total_units,occupancy_type
01J4SO3XTWZF4PP38209,1,31,15000.0,4.95,60,1,94.0,8888,4,9,9999,10,25060,MS,28047,45000.0,1,3


In [44]:
test_point.printSchema()

root
 |-- lei: string (nullable = true)
 |-- loan_type: integer (nullable = true)
 |-- loan_purpose: integer (nullable = true)
 |-- loan_amount: float (nullable = true)
 |-- interest_rate: float (nullable = true)
 |-- loan_term: integer (nullable = true)
 |-- action_taken: integer (nullable = true)
 |-- income: float (nullable = true)
 |-- applicant_age: string (nullable = true)
 |-- applicant_sex: integer (nullable = true)
 |-- applicant_credit_score_type: integer (nullable = true)
 |-- co_applicant_age: string (nullable = true)
 |-- co_applicant_credit_score_type: integer (nullable = true)
 |-- derived_msa_md: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- county_code: string (nullable = true)
 |-- property_value: float (nullable = true)
 |-- total_units: integer (nullable = true)
 |-- occupancy_type: integer (nullable = true)



In [46]:
transform_point = transform_model.transform(test_point)
predict_point = lr_model.transform(transform_point)

In [47]:
result = predict_point.select('prediction')

In [49]:
result.collect()[0][0]

1.0