# IFRS9 Local Pipeline Execution

This notebook demonstrates the complete IFRS9 pipeline execution locally using PySpark.

In [None]:
import sys
import pandas as pd
import json
from datetime import datetime
from pyspark.sql import SparkSession

# Add src to path
sys.path.append('/home/jovyan/src')

from generate_data import DataGenerator
from rules_engine import IFRS9RulesEngine
from validation import DataValidator
from ml_model import CreditRiskClassifier

## 1. Initialize Spark Session

In [None]:
# Initialize Spark with proper configuration
spark = SparkSession.builder \
    .appName("IFRS9LocalPipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Spark UI: http://localhost:4040")

## 2. Generate Synthetic Data

In [None]:
# Generate synthetic loan portfolio
print("Generating synthetic data...")
generator = DataGenerator(seed=42)

# Generate datasets
loans_df = generator.generate_loan_portfolio(n_loans=5000)
payments_df = generator.generate_payment_history(loans_df, n_months=6)
macro_df = generator.generate_macroeconomic_data()

print(f"Generated:")
print(f"  - Loans: {len(loans_df):,} records")
print(f"  - Payments: {len(payments_df):,} records")  
print(f"  - Macro data: {len(macro_df):,} records")

# Save to temporary files
loans_df.to_csv('/tmp/loan_portfolio.csv', index=False)
payments_df.to_csv('/tmp/payment_history.csv', index=False)
macro_df.to_csv('/tmp/macro_data.csv', index=False)

print("Data saved to /tmp/")

## 3. Data Validation

In [None]:
# Validate data quality
print("Running data validation...")
validator = DataValidator()

# Validate loan portfolio
loans_passed, loans_errors = validator.validate_loan_portfolio(loans_df)
print(f"Loan validation: {'PASSED' if loans_passed else 'FAILED'}")
if loans_errors:
    for error in loans_errors:
        print(f"  - {error}")

# Validate payments
payments_passed, payments_errors = validator.validate_payment_history(payments_df)
print(f"Payment validation: {'PASSED' if payments_passed else 'FAILED'}")
if payments_errors:
    for error in payments_errors:
        print(f"  - {error}")

# Data quality report
quality_report = validator.check_data_quality(loans_df)
print(f"\nData Quality Metrics:")
print(f"  - Completeness Score: {quality_report['completeness_score']:.1f}%")
print(f"  - Total Records: {quality_report['total_records']:,}")
print(f"  - Total Columns: {quality_report['total_columns']}")

## 4. IFRS9 Rules Processing

In [None]:
# Process loans through IFRS9 rules engine
print("Processing IFRS9 rules...")
engine = IFRS9RulesEngine(spark=spark)

# Load data into Spark DataFrame
spark_loans_df = spark.createDataFrame(loans_df)

print(f"Loaded {spark_loans_df.count():,} loans into Spark")
print("Spark DataFrame Schema:")
spark_loans_df.printSchema()

In [None]:
# Apply IFRS9 processing
processed_df = engine.process_portfolio(spark_loans_df)

print(f"Processing complete!")
print(f"Processed {processed_df.count():,} loans")

# Show sample results
print("\nSample processed records:")
processed_df.select(
    "loan_id", "loan_type", "credit_score", "days_past_due", 
    "calculated_stage", "calculated_pd", "calculated_lgd", "calculated_ecl", "risk_rating"
).show(10, truncate=False)

## 5. Validation and Summary

In [None]:
# Run validation on IFRS9 calculations
print("Validating IFRS9 calculations...")
validations = engine.validate_calculations(processed_df)

print("\nValidation Results:")
for validation in validations:
    status = "✓" if validation["passed"] else "✗"
    print(f"  {status} {validation['check']}: {validation['message']}")

# Generate summary report
print("\nGenerating summary report...")
summary = engine.generate_summary_report(processed_df)

print("\n" + "="*60)
print("IFRS9 PORTFOLIO SUMMARY REPORT")
print("="*60)

# Portfolio metrics
metrics = summary["portfolio_metrics"]
print(f"\nPortfolio Overview:")
print(f"  Total Loans: {metrics['total_loans']:,}")
print(f"  Total Exposure: ${metrics['total_exposure']:,.2f}")
print(f"  Total ECL: ${metrics['total_ecl']:,.2f}")
print(f"  Coverage Ratio: {metrics['coverage_ratio']:.2%}")

# Stage distribution
print(f"\nStage Distribution:")
for stage, data in summary["stage_distribution"].items():
    print(f"  {stage}: {data['count']:,} loans (${data['exposure']:,.2f} exposure, ${data['ecl']:,.2f} ECL)")

# Risk distribution
print(f"\nRisk Rating Distribution:")
for rating, data in summary["risk_distribution"].items():
    print(f"  {rating}: {data['count']:,} loans (avg PD: {data['avg_pd']:.2%}, ECL: ${data['ecl']:,.2f})")

## 6. Machine Learning Model Training

In [None]:
# Train ML models for credit risk prediction
print("Training ML models...")
classifier = CreditRiskClassifier(model_type="random_forest")

# Convert Spark DataFrame to Pandas for ML training
ml_df = processed_df.toPandas()

# Prepare features
X, feature_names = classifier.prepare_features(ml_df)
print(f"Prepared {len(feature_names)} features for ML training")
print(f"Feature matrix shape: {X.shape}")

# Show feature names
print(f"\nTop 10 features: {feature_names[:10]}")

In [None]:
# Train stage classifier
print("Training stage classifier...")
stage_metrics = classifier.train_stage_classifier(X, ml_df["calculated_stage"], test_size=0.3)

print(f"\nStage Classifier Results:")
print(f"  Accuracy: {stage_metrics['accuracy']:.2%}")
print(f"  CV Mean: {stage_metrics['cv_mean']:.2%} (±{stage_metrics['cv_std']:.2%})")
if stage_metrics.get('roc_auc'):
    print(f"  ROC AUC: {stage_metrics['roc_auc']:.3f}")

print(f"\nTop 5 Important Features for Stage Classification:")
for feature in stage_metrics['feature_importance'][:5]:
    print(f"  - {feature['feature']}: {feature['importance']:.3f}")

In [None]:
# Train PD model
print("Training PD regression model...")
pd_metrics = classifier.train_pd_model(X, ml_df["calculated_pd"], test_size=0.3)

print(f"\nPD Model Results:")
print(f"  MAE: {pd_metrics['mae']:.4f}")
print(f"  RMSE: {pd_metrics['rmse']:.4f}")
print(f"  R² Score: {pd_metrics['r2_score']:.3f}")
print(f"  CV MAE: {pd_metrics['cv_mae_mean']:.4f} (±{pd_metrics['cv_mae_std']:.4f})")

print(f"\nTop 5 Important Features for PD Prediction:")
for feature in pd_metrics['feature_importance'][:5]:
    print(f"  - {feature['feature']}: {feature['importance']:.3f}")

## 7. Model Predictions and Explanations

In [None]:
# Make predictions on new data
print("Making predictions on test data...")

# Take a sample for demonstration
test_sample = X.iloc[:10]

# Predict stages
predicted_stages, stage_probabilities = classifier.predict_stage(test_sample)
predicted_pd = classifier.predict_pd(test_sample)

print("\nPrediction Results (First 10 loans):")
print("-" * 80)
for i in range(len(predicted_stages)):
    loan_id = ml_df.iloc[i]['loan_id']
    actual_stage = ml_df.iloc[i]['calculated_stage']
    pred_stage = predicted_stages[i]
    pred_pd = predicted_pd[i]
    
    print(f"Loan {loan_id}: Actual={actual_stage}, Predicted={pred_stage}, PD={pred_pd:.3f}")

In [None]:
# Explain a specific prediction
print("Explaining prediction for first loan...")
explanation = classifier.explain_prediction(test_sample, index=0)

print(f"\nPrediction Explanation for Loan {ml_df.iloc[0]['loan_id']}:")
print(f"  Predicted Stage: {explanation['predicted_stage']}")
if 'predicted_pd' in explanation:
    print(f"  Predicted PD: {explanation['predicted_pd']:.3f}")

print(f"\n  Stage Probabilities:")
for stage, prob in explanation['stage_probabilities'].items():
    print(f"    {stage}: {prob:.3f}")

print(f"\n  Top Contributing Features:")
for feature in explanation['top_features']:
    print(f"    {feature['feature']}: {feature['value']:.2f} (importance: {feature['importance']:.3f})")

## 8. Save Results

In [None]:
# Save processed results
print("Saving results...")

# Save processed data
processed_pandas = processed_df.toPandas()
processed_pandas.to_csv('/tmp/ifrs9_processed.csv', index=False)

# Save summary report
with open('/tmp/ifrs9_summary.json', 'w') as f:
    json.dump(summary, f, indent=2, default=str)

# Save model performance metrics
model_metrics = {
    'stage_classifier': stage_metrics,
    'pd_model': pd_metrics,
    'timestamp': datetime.now().isoformat()
}

with open('/tmp/model_metrics.json', 'w') as f:
    json.dump(model_metrics, f, indent=2, default=str)

print("Results saved to /tmp/:")
print("  - ifrs9_processed.csv: Processed loan data")
print("  - ifrs9_summary.json: Portfolio summary report") 
print("  - model_metrics.json: ML model performance metrics")

## 9. Cleanup

In [None]:
# Stop Spark session
print("Cleaning up resources...")
engine.stop()
print("Pipeline execution completed successfully!")

print(f"\n" + "="*60)
print("PIPELINE EXECUTION SUMMARY")
print("="*60)
print(f"✓ Data Generation: {len(loans_df):,} loans")
print(f"✓ Data Validation: {'Passed' if loans_passed and payments_passed else 'Issues found'}")
print(f"✓ IFRS9 Processing: {processed_df.count():,} loans processed")
print(f"✓ ML Training: Stage accuracy {stage_metrics['accuracy']:.1%}, PD MAE {pd_metrics['mae']:.4f}")
print(f"✓ Results Saved: 3 output files in /tmp/")
print("="*60)