# End-to-End Corporate Credit Risk Rating System (PySpark)

This notebook implements a complete credit risk pipeline suitable for Databricks.

## 1. Configuration & Setup

In [None]:

# Configuration
RATING_MAPPING = {
    "AAA": 0.0005,
    "AA": 0.001,
    "A": 0.002,
    "BBB": 0.005,
    "BB": 0.01,
    "B": 0.03,
    "CCC": 0.08,
    "CC": 0.15,
    "C": 0.25,
    "D": 1.00
}

FEATURES_NUMERIC = [
    "revenue", "total_assets", "total_liabilities", "ebitda", "interest_expense",
    "debt_to_equity", "current_ratio", "interest_coverage", "roa", "age"
]
FEATURES_CATEGORICAL = ["industry"]


## 2. Synthetic Data Generation

In [None]:

import pandas as pd
import numpy as np
import random
from pyspark.sql.types import *

def generate_synthetic_data(n_rows=10000, seed=42):
    np.random.seed(seed)
    random.seed(seed)
    data = []
    industries = ['Technology', 'Manufacturing', 'Retail', 'Healthcare', 'Energy', 'Finance']
    
    for i in range(n_rows):
        company_id = f"COMP_{i:06d}"
        industry = random.choice(industries)
        
        # Base financials
        assets = np.exp(np.random.normal(15, 1.5))
        is_risky = random.random() < 0.15
        
        if is_risky:
            liabilities_ratio = np.random.uniform(0.6, 1.5)
            profit_margin = np.random.uniform(-0.1, 0.05)
            coverage_factor = np.random.uniform(0, 2)
        else:
            liabilities_ratio = np.random.uniform(0.2, 0.7)
            profit_margin = np.random.uniform(0.05, 0.25)
            coverage_factor = np.random.uniform(3, 15)

        liabilities = assets * liabilities_ratio
        revenue = assets * np.random.uniform(0.5, 2.0)
        ebitda = revenue * profit_margin
        interest_expense = liabilities * 0.05 
        current_assets = assets * np.random.uniform(0.3, 0.6)
        cl_ratio = np.random.uniform(0.6, 0.9) if is_risky else np.random.uniform(0.3, 0.5)
        current_liabilities = liabilities * cl_ratio
        founding_year = np.random.randint(1950, 2020)
        
        default = 0
        if liabilities > assets: default = 1
        current_ratio = current_assets / current_liabilities if current_liabilities > 0 else 0
        interest_coverage = ebitda / interest_expense if interest_expense > 0 else 0
        if current_ratio < 0.9 and interest_coverage < 1.2: default = 1
        if is_risky and random.random() < 0.1: default = 1
            
        row = (
            company_id, industry, float(assets), float(liabilities), float(current_assets),
            float(current_liabilities), float(revenue), float(ebitda), float(interest_expense),
            int(founding_year), int(default)
        )
        data.append(row)
        
    schema = StructType([
        StructField("company_id", StringType(), True),
        StructField("industry", StringType(), True),
        StructField("total_assets", DoubleType(), True),
        StructField("total_liabilities", DoubleType(), True),
        StructField("current_assets", DoubleType(), True),
        StructField("current_liabilities", DoubleType(), True),
        StructField("revenue", DoubleType(), True),
        StructField("ebitda", DoubleType(), True),
        StructField("interest_expense", DoubleType(), True),
        StructField("founding_year", IntegerType(), True),
        StructField("default", IntegerType(), True)
    ])
    
    # Create Spark DataFrame directly
    return spark.createDataFrame(data, schema)

# Generate and Display
print("Generating Data...")
df = generate_synthetic_data(10000)
display(df)


## 3. Feature Engineering

In [None]:

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

def calculate_financial_ratios(df):
    df = df.withColumn("equity", F.col("total_assets") - F.col("total_liabilities"))
    df = df.withColumn("debt_to_equity", F.when(F.col("equity") == 0, 999.0).otherwise(F.col("total_liabilities") / F.col("equity")))
    df = df.withColumn("current_ratio", F.when(F.col("current_liabilities") == 0, 999.0).otherwise(F.col("current_assets") / F.col("current_liabilities")))
    df = df.withColumn("interest_coverage", F.when(F.col("interest_expense") == 0, 999.0).otherwise(F.col("ebitda") / F.col("interest_expense")))
    df = df.withColumn("roa", F.when(F.col("total_assets") == 0, 0.0).otherwise(F.col("ebitda") / F.col("total_assets")))
    df = df.withColumn("age", F.lit(2025) - F.col("founding_year"))
    return df

def build_feature_pipeline(categorical_cols, numerical_cols):
    stages = []
    for cat_col in categorical_cols:
        indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_indexed", handleInvalid="keep")
        encoder = OneHotEncoder(inputCols=[f"{cat_col}_indexed"], outputCols=[f"{cat_col}_vec"])
        stages += [indexer, encoder]
    
    assembler_inputs = [f"{col}_vec" for col in categorical_cols] + numerical_cols
    assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features", handleInvalid="keep")
    stages.append(assembler)
    return Pipeline(stages=stages)

# Apply
df_ratios = calculate_financial_ratios(df)
feature_pipeline = build_feature_pipeline(FEATURES_CATEGORICAL, FEATURES_NUMERIC)
feature_model = feature_pipeline.fit(df_ratios)
df_features = feature_model.transform(df_ratios)

display(df_features.select("company_id", "features", "default"))


## 4. Modeling (Random Forest)

In [None]:

from pyspark.ml.classification import RandomForestClassifier, LogisticRegression

train_df, test_df = df_features.randomSplit([0.8, 0.2], seed=42)

rf = RandomForestClassifier(featuresCol="features", labelCol="default", numTrees=20)
model = rf.fit(train_df)
predictions = model.transform(test_df)

display(predictions.select("company_id", "default", "probability", "prediction"))


## 5. Evaluation & Analysis

In [None]:

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# AUC
binary_evaluator = BinaryClassificationEvaluator(labelCol="default", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = binary_evaluator.evaluate(predictions)
print(f"AUC: {auc}")

# Accuracy
multiclass_evaluator = MulticlassClassificationEvaluator(labelCol="default", predictionCol="prediction", metricName="accuracy")
accuracy = multiclass_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")


## 6. Risk Scoring & Calibration

In [None]:

from pyspark.sql.types import DoubleType

def normalize_probability(probability_vector):
    try: return float(probability_vector[1])
    except: return 0.0

extract_pd_udf = F.udf(normalize_probability, DoubleType())

def calculate_risk_score_and_rating(df):
    df = df.withColumn("pd", extract_pd_udf(F.col("probability")))
    df = df.withColumn("risk_score", (1 - F.col("pd")) * 1000)
    
    # Rating Logic
    sorted_rating = sorted(RATING_MAPPING.items(), key=lambda x: x[1])
    expr = None
    for rating, threshold in sorted_rating:
        if expr is None: expr = F.when(F.col("pd") <= threshold, rating)
        else: expr = expr.when(F.col("pd") <= threshold, rating)
    expr = expr.otherwise("D")
    
    return df.withColumn("rating", expr)

scored_df = calculate_risk_score_and_rating(predictions)
display(scored_df.select("company_id", "pd", "risk_score", "rating"))


## 7. Migration Matrix Simulation

In [None]:

def generate_migration_matrix(df):
    df = df.withColumn("noise", F.randn() * 0.02)
    df = df.withColumn("future_pd", F.col("pd") + F.col("noise"))
    df = df.withColumn("future_pd", F.when(F.col("future_pd") < 0, 0.0).when(F.col("future_pd") > 1, 1.0).otherwise(F.col("future_pd")))
    
    # Re-apply rating logic on future PD requires duplicating logic or UDF
    # For notebook simplicity, we repeat logic
    sorted_rating = sorted(RATING_MAPPING.items(), key=lambda x: x[1])
    expr = None
    for rating, threshold in sorted_rating:
        if expr is None: expr = F.when(F.col("future_pd") <= threshold, rating)
        else: expr = expr.when(F.col("future_pd") <= threshold, rating)
    expr = expr.otherwise("D")
    
    df = df.withColumn("rating_t1", expr)
    return df.stat.crosstab("rating", "rating_t1")

matrix = generate_migration_matrix(scored_df)
display(matrix)
