In [25]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, count
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline


In [26]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Credit Score Preprocessing") \
    .getOrCreate()


In [27]:
# Load dataset
df = spark.read.csv("../archive/train.csv", header=True, inferSchema=True)
print("Schema:")
df.printSchema()
print("Sample data:")
df.show(100)


Schema:
root
 |-- ID: string (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SSN: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Annual_Income: string (nullable = true)
 |-- Monthly_Inhand_Salary: double (nullable = true)
 |-- Num_Bank_Accounts: integer (nullable = true)
 |-- Num_Credit_Card: integer (nullable = true)
 |-- Interest_Rate: integer (nullable = true)
 |-- Num_of_Loan: string (nullable = true)
 |-- Type_of_Loan: string (nullable = true)
 |-- Delay_from_due_date: integer (nullable = true)
 |-- Num_of_Delayed_Payment: string (nullable = true)
 |-- Changed_Credit_Limit: string (nullable = true)
 |-- Num_Credit_Inquiries: double (nullable = true)
 |-- Credit_Mix: string (nullable = true)
 |-- Outstanding_Debt: string (nullable = true)
 |-- Credit_Utilization_Ratio: double (nullable = true)
 |-- Credit_History_Age: string (nullabl

In [28]:
from pyspark.sql.functions import regexp_replace, when, col

# Define which string‐columns should be numbers
int_cols = [
    "Age", "Num_Bank_Accounts", "Num_Credit_Card",
    "Num_of_Loan", "Delay_from_due_date", "Num_of_Delayed_Payment",
    "Num_Credit_Inquiries"
]
float_cols = [
    "Annual_Income", "Monthly_Inhand_Salary", "Interest_Rate",
    "Changed_Credit_Limit", "Outstanding_Debt", "Credit_Utilization_Ratio",
    "Total_EMI_per_month", "Amount_invested_monthly", "Monthly_Balance"
]

# For each of these, strip out any non-digit/dot characters,
# and turn "NA" or empty strings into null
for c in int_cols + float_cols:
    df = df.withColumn(
        c,
        when(
            col(c).isNull() | (col(c) == "NA") | (col(c) == ""),
            None
        ).otherwise(
            regexp_replace(col(c), "[^0-9\\.]", "")
        )
    )


In [29]:
from pyspark.sql.types import IntegerType, DoubleType

# Cast integer columns
for c in int_cols:
    df = df.withColumn(c, col(c).cast(IntegerType()))

# Cast float columns
for c in float_cols:
    df = df.withColumn(c, col(c).cast(DoubleType()))

# Quick check
df.printSchema()
df.show(5)


root
 |-- ID: string (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SSN: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Annual_Income: double (nullable = true)
 |-- Monthly_Inhand_Salary: double (nullable = true)
 |-- Num_Bank_Accounts: integer (nullable = true)
 |-- Num_Credit_Card: integer (nullable = true)
 |-- Interest_Rate: double (nullable = true)
 |-- Num_of_Loan: integer (nullable = true)
 |-- Type_of_Loan: string (nullable = true)
 |-- Delay_from_due_date: integer (nullable = true)
 |-- Num_of_Delayed_Payment: integer (nullable = true)
 |-- Changed_Credit_Limit: double (nullable = true)
 |-- Num_Credit_Inquiries: integer (nullable = true)
 |-- Credit_Mix: string (nullable = true)
 |-- Outstanding_Debt: double (nullable = true)
 |-- Credit_Utilization_Ratio: double (nullable = true)
 |-- Credit_History_Age: string (nullable = t

In [30]:
from pyspark.sql.functions import regexp_extract, col
from pyspark.sql.types import DoubleType

# 1) Extract the “Years” and “Months” as strings, then cast
df = (
    df
    .withColumn(
        "Credit_History_Years",
        regexp_extract(col("Credit_History_Age"), r"(\d+)\s+Years", 1).cast(DoubleType())
    )
    .withColumn(
        "Credit_History_Months",
        regexp_extract(col("Credit_History_Age"), r"(\d+)\s+Months", 1).cast(DoubleType())
    )
)

# 2) Combine into a single fractional years column
df = df.withColumn(
    "Credit_History_Age_Years",
    col("Credit_History_Years") + (col("Credit_History_Months") / 12)
)

# 3) Inspect the result
df.select(
    "Credit_History_Age",
    "Credit_History_Years",
    "Credit_History_Months",
    "Credit_History_Age_Years"
).show(10, truncate=False)

# Drop the old string column and intermediate year/month columns, then rename the computed column
df = (
    df
    .drop("Credit_History_Age", "Credit_History_Years", "Credit_History_Months")
    .withColumnRenamed("Credit_History_Age_Years", "Credit_History_Age")
)

# Verify the replacement
df.select("Credit_History_Age").show(5, truncate=False)


+---------------------+--------------------+---------------------+------------------------+
|Credit_History_Age   |Credit_History_Years|Credit_History_Months|Credit_History_Age_Years|
+---------------------+--------------------+---------------------+------------------------+
|22 Years and 1 Months|22.0                |1.0                  |22.083333333333332      |
|NA                   |NULL                |NULL                 |NULL                    |
|22 Years and 3 Months|22.0                |3.0                  |22.25                   |
|22 Years and 4 Months|22.0                |4.0                  |22.333333333333332      |
|22 Years and 5 Months|22.0                |5.0                  |22.416666666666668      |
|22 Years and 6 Months|22.0                |6.0                  |22.5                    |
|22 Years and 7 Months|22.0                |7.0                  |22.583333333333332      |
|NA                   |NULL                |NULL                 |NULL          

In [31]:
import math
from pyspark.sql.functions import col

# 1) Capture column names in a simple Python list
cols = df.columns

# 2) Define your mapper and reducer
def mapper_missing(row):
    out = []
    for c in cols:
        v = row[c]
        if v is None or (isinstance(v, float) and math.isnan(v)):
            out.append((c, 1))
        else:
            out.append((c, 0))
    return out

def reducer_sum(a, b):
    return a + b

# 3) Apply map and reduce
mapped = df.rdd.flatMap(mapper_missing)
missing_counts = mapped.reduceByKey(reducer_sum)

# 4) Collect & display
print("=== Missing‐value counts per column ===")
for colName, cnt in missing_counts.collect():
    print(f"{colName:30s} → {cnt}")


=== Missing‐value counts per column ===
Type_of_Loan                   → 11408
Outstanding_Debt               → 0
Monthly_Balance                → 1200
Num_Bank_Accounts              → 0
Payment_of_Min_Amount          → 0
Credit_Score                   → 0
Occupation                     → 0
Num_of_Delayed_Payment         → 7002
Name                           → 9985
SSN                            → 0
Changed_Credit_Limit           → 2091
Annual_Income                  → 0
Interest_Rate                  → 0
Credit_Mix                     → 0
Amount_invested_monthly        → 4479
Age                            → 0
Monthly_Inhand_Salary          → 15002
Total_EMI_per_month            → 0
Payment_Behaviour              → 0
Credit_History_Age             → 9030
Month                          → 0
Num_Credit_Card                → 0
Num_Credit_Inquiries           → 1965
Credit_Utilization_Ratio       → 0
ID                             → 0
Customer_ID                    → 0
Num_of_Loan          

In [32]:
# 1) Compute total row count and threshold
total_count = df.count()
threshold = total_count * 0.5

# 2) Turn your RDD results into a Python dict
missing_map = dict(missing_counts.collect())

# 3) Pick columns where missing > 50%
to_drop = [col for col, cnt in missing_map.items() if cnt > threshold]
print("Dropping columns:", to_drop)

# 4) Drop them from your DataFrame
df = df.drop(*to_drop)


Dropping columns: []


In [33]:
#Drop irrelevant columns
#rethink occupation
irrelevant_columns = ['ID', 'Customer_ID', 'Name' ,'Age', 'SSN','Month','Occupation']
df = df.drop(*irrelevant_columns)
print("Columns after dropping irrelevant ones:", df.columns)


Columns after dropping irrelevant ones: ['Annual_Income', 'Monthly_Inhand_Salary', 'Num_Bank_Accounts', 'Num_Credit_Card', 'Interest_Rate', 'Num_of_Loan', 'Type_of_Loan', 'Delay_from_due_date', 'Num_of_Delayed_Payment', 'Changed_Credit_Limit', 'Num_Credit_Inquiries', 'Credit_Mix', 'Outstanding_Debt', 'Credit_Utilization_Ratio', 'Payment_of_Min_Amount', 'Total_EMI_per_month', 'Amount_invested_monthly', 'Payment_Behaviour', 'Monthly_Balance', 'Credit_Score', 'Credit_History_Age']


In [34]:
from pyspark.sql.functions import col, when, count, udf
from pyspark.sql.types import DoubleType

# 1) Identify numeric columns
numeric_cols = [c for c, t in df.dtypes if t in ("double", "int", "float")]

# 2) Define your mapper and reducer
def mapper_sum_count(row):
    out = []
    for c in numeric_cols:
        v = row[c]
        if v is not None:
            out.append((c, (float(v), 1)))
    return out

def reducer_sum_count(a, b):
    return (a[0] + b[0], a[1] + b[1])

# 3) Compute sum & count per column
sum_count_rdd = df.rdd.flatMap(mapper_sum_count).reduceByKey(reducer_sum_count)

# 4) Compute averages
averages = sum_count_rdd.mapValues(lambda sc: sc[0] / sc[1] if sc[1] else None).collectAsMap()

# 5) Broadcast averages
avg_broadcast = spark.sparkContext.broadcast(averages)

# 6) Build an imputation UDF factory
def impute_avg_factory(colName):
    avg = avg_broadcast.value.get(colName, None)
    return udf(lambda v: avg if v is None else float(v), DoubleType())

# 7) Apply UDF to each numeric column
for c in numeric_cols:
    df = df.withColumn(c, impute_avg_factory(c)(col(c)))

# 8) Verify no nulls remain
df.select([count(when(col(c).isNull(), c)).alias(c) for c in numeric_cols])\
  .show(truncate=False)


+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------------+------------------------+-------------------+-----------------------+---------------+------------------+
|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Outstanding_Debt|Credit_Utilization_Ratio|Total_EMI_per_month|Amount_invested_monthly|Monthly_Balance|Credit_History_Age|
+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------------+------------------------+-------------------+-----------------------+---------------+------------------+
|0            |0                    |0                |0              |0            |0  

In [35]:
df.describe().show()

+-------+------------------+---------------------+------------------+-----------------+------------------+-----------------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+------------------+------------------------+---------------------+-------------------+-----------------------+--------------------+--------------------+------------+-------------------+
|summary|     Annual_Income|Monthly_Inhand_Salary| Num_Bank_Accounts|  Num_Credit_Card|     Interest_Rate|      Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|  Outstanding_Debt|Credit_Utilization_Ratio|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|   Payment_Behaviour|     Monthly_Balance|Credit_Score| Credit_History_Age|
+-------+------------------+---------------------+------------------+-----------------+------------------+-----------------+--------------------+-

In [36]:
# Fill missing categorical columns with "Missing"
categorical_cols = [c for c, t in df.dtypes if t == "string"]
df = df.na.fill({c: "Missing" for c in categorical_cols})


In [37]:
# StringIndexer for each categorical column
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
    for c in categorical_cols
]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)
df.show(1000)

#drop original categorical columns
for c in categorical_cols:
    df = df.drop(c)


+-------------+---------------------+-----------------+---------------+-------------+-----------+--------------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+---------------------+-------------------+-----------------------+--------------------+--------------------+------------+------------------+----------------+--------------+-------------------------+---------------------+----------------+
|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|        Type_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Payment_of_Min_Amount|Total_EMI_per_month|Amount_invested_monthly|   Payment_Behaviour|     Monthly_Balance|Credit_Score|Credit_History_Age|Type_of_Loan_idx|Credit_Mix_idx|Payment_of_Min_Amount_idx|Payment_Behaviour_idx|Credit_Score_idx|
+-----------

In [38]:
# OneHotEncoder for indexed columns
encoder = OneHotEncoder(
    inputCols=[f"{c}_idx" for c in categorical_cols],
    outputCols=[f"{c}_ohe" for c in categorical_cols]
)
df = encoder.fit(df).transform(df)



In [39]:
# Assemble numeric + one-hot features into a single vector
feature_cols = numeric_cols + [f"{c}_ohe" for c in categorical_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features")
df = assembler.transform(df)


In [40]:
# Rename raw_features to features for tree-based algorithms
# (RandomForest, GBT do not require scaled inputs)
df_approachA = df.withColumnRenamed("raw_features", "features")

# You can now feed df_approachA into your RandomForestClassifier or GBTClassifier directly:
# e.g. rf = RandomForestClassifier(featuresCol="features", labelCol="Credit Score", ...)

print(df.dtypes)

[('Annual_Income', 'double'), ('Monthly_Inhand_Salary', 'double'), ('Num_Bank_Accounts', 'double'), ('Num_Credit_Card', 'double'), ('Interest_Rate', 'double'), ('Num_of_Loan', 'double'), ('Delay_from_due_date', 'double'), ('Num_of_Delayed_Payment', 'double'), ('Changed_Credit_Limit', 'double'), ('Num_Credit_Inquiries', 'double'), ('Outstanding_Debt', 'double'), ('Credit_Utilization_Ratio', 'double'), ('Total_EMI_per_month', 'double'), ('Amount_invested_monthly', 'double'), ('Monthly_Balance', 'double'), ('Credit_History_Age', 'double'), ('Type_of_Loan_idx', 'double'), ('Credit_Mix_idx', 'double'), ('Payment_of_Min_Amount_idx', 'double'), ('Payment_Behaviour_idx', 'double'), ('Credit_Score_idx', 'double'), ('Type_of_Loan_ohe', 'vector'), ('Credit_Mix_ohe', 'vector'), ('Payment_of_Min_Amount_ohe', 'vector'), ('Payment_Behaviour_ohe', 'vector'), ('Credit_Score_ohe', 'vector'), ('raw_features', 'vector')]


In [41]:
# from pyspark.ml.feature import VectorAssembler, StandardScaler

# # 1) Assemble just the numeric columns into "raw_num"
# num_assembler = VectorAssembler(inputCols=numeric_cols, outputCol="raw_num")
# df2 = num_assembler.transform(df)

# # 2) Scale numeric features WITHOUT centering to preserve sparsity of one-hot
# scaler = StandardScaler(
#     inputCol="raw_num", 
#     outputCol="scaled_num", 
#     withMean=False,   # do not center
#     withStd=True      # scale to unit variance
# )
# df2 = scaler.fit(df2).transform(df2)

# # 3) Re-assemble scaled numeric + all one-hot columns into final "features"
# final_assembler = VectorAssembler(
#     inputCols=["scaled_num"] + [f"{c}_ohe" for c in categorical_cols],
#     outputCol="features"
# )
# df_approachB = final_assembler.transform(df2)

# # Now df_approachB.features is ready for LogisticRegression, LinearSVC, etc.


In [42]:
# normalize fetaures

from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol="features", outputCol="norm_features", p=2.0)
df = normalizer.transform(df_approachA)



In [43]:

# Save the preprocessed data to CSV files
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
train_df.show(5)

+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------------+------------------------+-------------------+-----------------------+------------------+------------------+----------------+--------------+-------------------------+---------------------+----------------+------------------+--------------+-------------------------+---------------------+----------------+--------------------+--------------------+
|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Outstanding_Debt|Credit_Utilization_Ratio|Total_EMI_per_month|Amount_invested_monthly|   Monthly_Balance|Credit_History_Age|Type_of_Loan_idx|Credit_Mix_idx|Payment_of_Min_Amount_idx|Payment_Behaviour_idx|Credit_Score_idx|  Type_of_Loan_ohe|Credit_Mix_ohe|Payment_of_Min_Amo

In [44]:
# When you’re done, you can stop Spark
# spark.stop()

#cast credit_Score to int
train_df = train_df.withColumn("Credit_Score_idx", col("Credit_Score_idx").cast(IntegerType()))

In [45]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="norm_features",
    labelCol="Credit_Score_idx",
    maxIter=50,
    regParam=0.1,
    elasticNetParam=0.0  # L2 regularization
)
lrModel = lr.fit(train_df)


In [46]:
# Re-initialize Spark, reload and preprocess data, split train_df/test_df, and fit lrModel here…
# Then paste the evaluation cell:
predictions = lrModel.transform(test_df)
predictions.select("Credit_Score_idx", "prediction", "probability").show(10)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="Credit_Score_idx", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
# ... similarly for precision, recall, f1 ...
print(f"Accuracy = {accuracy:.3f}")
# etc.


+----------------+----------+--------------------+
|Credit_Score_idx|prediction|         probability|
+----------------+----------+--------------------+
|             0.0|       0.0|[0.98716565932735...|
|             0.0|       0.0|[0.98653384609308...|
|             0.0|       0.0|[0.98096110046742...|
|             1.0|       1.0|[0.00264844696371...|
|             1.0|       1.0|[0.00346525193789...|
|             1.0|       1.0|[0.00390609580049...|
|             1.0|       1.0|[0.00157744790512...|
|             1.0|       1.0|[7.30103441357076...|
|             1.0|       1.0|[0.00189494241838...|
|             1.0|       1.0|[0.00743000851859...|
+----------------+----------+--------------------+
only showing top 10 rows

Accuracy = 0.910
