## 1. Testing to see if scripts exist in directory

In [1]:
import os
SCRIPTS_DIR = "/app/utils"

print("üìÅ Checking scripts directory...")
if os.path.exists(SCRIPTS_DIR):
    scripts = [f for f in os.listdir(SCRIPTS_DIR) if f.endswith('.py')]
    print("Available scripts:")
    for script in scripts:
        print(f"  - {script}")
else:
    print(f"‚ùå Directory {SCRIPTS_DIR} does not exist")

üìÅ Checking scripts directory...
Available scripts:
  - model_training_LR.py
  - model_training_RF.py
  - model_training_XG.py
  - processing_bronze_table.py
  - processing_gold_table.py
  - processing_silver_table.py


## 2. Testing that model scripts are valid

In [3]:
#!/usr/bin/env python3
# coding: utf-8
"""
Test script for loan default prediction training pipelines
Run this in Jupyter Lab to test all three model training scripts in /app/utils/
"""

import subprocess
import sys
import os
from datetime import datetime, timedelta

# Set the scripts directory
SCRIPTS_DIR = "/app/utils"

def test_training_scripts():
    """Test all three training scripts with sample parameters"""
    
    # Get current date and calculate a training date (2 months ago for realistic data)
    test_date = (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")
    
    # Base command parameters
    base_params = {
        'train_date': test_date,
        'features_path': '/app/datamart/gold/feature_store/',
        'labels_path': '/app/datamart/gold/label_store/',
        'sample_frac': 0.05,  # Very small sample for quick testing
        'n_iter': 2,          # Minimal iterations for testing
        'cv_folds': 2,        # Minimal folds for testing
        'train_months': 2,
        'val_months': 1,
        'test_months': 1,
        'oot_months': 1,
        'mlflow_tracking_uri': 'http://localhost:5000',
        'mlflow_experiment': 'loan-default-testing'
    }
    
    scripts_to_test = [
        {
            'name': 'Logistic Regression',
            'script': 'model_training_LR.py',
            'params': base_params.copy()
        },
        {
            'name': 'XGBoost', 
            'script': 'model_training_XG.py',
            'params': base_params.copy()
        },
        {
            'name': 'Random Forest',
            'script': 'model_training_RF.py',
            'params': base_params.copy()
        }
    ]
    
    results = {}
    
    print("üî¨ Starting Training Script Tests")
    print(f"üìÅ Scripts directory: {SCRIPTS_DIR}")
    print("=" * 60)
    
    for script_info in scripts_to_test:
        script_name = script_info['name']
        script_file = script_info['script']
        script_path = os.path.join(SCRIPTS_DIR, script_file)
        params = script_info['params']
        
        print(f"\nüß™ Testing {script_name}...")
        print(f"üìÅ Script: {script_path}")
        
        # Check if script exists
        if not os.path.exists(script_path):
            print(f"‚ùå Script not found: {script_path}")
            results[script_name] = {
                'status': 'SCRIPT_NOT_FOUND',
                'returncode': None,
                'stdout': '',
                'stderr': f'Script file {script_path} not found'
            }
            continue
        
        # Build command
        cmd = [sys.executable, script_path]
        
        # Add parameters
        for key, value in params.items():
            cmd.append(f"--{key}")
            cmd.append(str(value))
        
        print(f"‚ö° Command: {' '.join(cmd)}")
        
        try:
            # Run the script
            print("üîÑ Executing script...")
            result = subprocess.run(
                cmd, 
                capture_output=True, 
                text=True, 
                timeout=300,  # 5 minute timeout
                cwd=SCRIPTS_DIR  # Run from scripts directory to help with imports
            )
            
            # Check results
            if result.returncode == 0:
                print(f"‚úÖ {script_name} - SUCCESS")
                print("üìù Output snippet:")
                output_lines = result.stdout.strip().split('\n')
                for line in output_lines[-10:]:
                    print(f"   {line}")
                
                results[script_name] = {
                    'status': 'SUCCESS',
                    'returncode': result.returncode,
                    'stdout': result.stdout,
                    'stderr': result.stderr
                }
            else:
                print(f"‚ùå {script_name} - FAILED (return code: {result.returncode})")
                print("üí• Error output:")
                error_lines = result.stderr.strip().split('\n')
                for line in error_lines:
                    print(f"   {line}")
                
                results[script_name] = {
                    'status': 'FAILED',
                    'returncode': result.returncode,
                    'stdout': result.stdout,
                    'stderr': result.stderr
                }
                
        except subprocess.TimeoutExpired:
            print(f"‚è∞ {script_name} - TIMEOUT (exceeded 5 minutes)")
            results[script_name] = {
                'status': 'TIMEOUT',
                'returncode': None,
                'stdout': '',
                'stderr': 'Execution timed out'
            }
        except Exception as e:
            print(f"üö® {script_name} - UNEXPECTED ERROR: {e}")
            results[script_name] = {
                'status': 'ERROR',
                'returncode': None,
                'stdout': '',
                'stderr': str(e)
            }
    
    # Print summary
    print("\n" + "=" * 60)
    print("üìä TEST SUMMARY")
    print("=" * 60)
    
    success_count = sum(1 for result in results.values() if result['status'] == 'SUCCESS')
    total_count = len(results)
    
    print(f"‚úÖ Success: {success_count}/{total_count}")
    
    for script_name, result in results.items():
        status_icon = "‚úÖ" if result['status'] == 'SUCCESS' else "‚ùå"
        print(f"{status_icon} {script_name}: {result['status']}")
    
    return results

def check_script_syntax():
    """Check syntax of training scripts"""
    print("üß™ Checking script syntax...")
    
    scripts_to_test = [
        ('model_training_LR.py', 'Logistic Regression'),
        ('model_training_XG.py', 'XGBoost'),
        ('model_training_RF.py', 'Random Forest')
    ]
    
    all_valid = True
    
    for script_file, script_name in scripts_to_test:
        script_path = os.path.join(SCRIPTS_DIR, script_file)
        if os.path.exists(script_path):
            print(f"\nüìÅ Testing {script_name} syntax...")
            try:
                with open(script_path, 'r') as f:
                    content = f.read()
                # Try to compile the script to check syntax
                compile(content, script_file, 'exec')
                print(f"‚úÖ {script_name} syntax is valid")
                
                # Check for common issues
                if 'model_preprocessor' in content:
                    print(f"‚ö†Ô∏è  {script_name} still contains reference to model_preprocessor")
                    all_valid = False
                if 'from model_preprocessor' in content:
                    print(f"‚ö†Ô∏è  {script_name} still contains reference to model_preprocessor")
                    all_valid = False
                    
            except SyntaxError as e:
                print(f"‚ùå {script_name} syntax error: {e}")
                print(f"   Line {e.lineno}: {e.text}")
                all_valid = False
            except Exception as e:
                print(f"‚ö†Ô∏è  {script_name} other error: {e}")
                all_valid = False
        else:
            print(f"‚ùå {script_name} not found: {script_path}")
            all_valid = False
    
    return all_valid

def verify_embedded_preprocessing():
    """Verify that preprocessing functions are embedded in scripts"""
    print("üîç Verifying embedded preprocessing functions...")
    
    scripts_to_check = [
        ('model_training_LR.py', 'Logistic Regression', 'preprocess_features_for_lr'),
        ('model_training_XG.py', 'XGBoost', 'preprocess_features_for_tree'),
        ('model_training_RF.py', 'Random Forest', 'preprocess_features_for_tree')
    ]
    
    all_have_functions = True
    
    for script_file, script_name, expected_function in scripts_to_check:
        script_path = os.path.join(SCRIPTS_DIR, script_file)
        if os.path.exists(script_path):
            with open(script_path, 'r') as f:
                content = f.read()
            
            if f'def {expected_function}' in content:
                print(f"‚úÖ {script_name} has embedded {expected_function}()")
            else:
                print(f"‚ùå {script_name} missing embedded {expected_function}()")
                all_have_functions = False
                
            # Check for required imports
            required_imports = [
                'from sklearn.preprocessing import',
                'from sklearn.compose import ColumnTransformer'
            ]
            
            for required_import in required_imports:
                if required_import in content:
                    print(f"   ‚úÖ Has {required_import}")
                else:
                    print(f"   ‚ö†Ô∏è  Missing {required_import}")
        else:
            print(f"‚ùå {script_name} not found: {script_path}")
            all_have_functions = False
    
    return all_have_functions

def list_available_scripts():
    """List all available training scripts"""
    print(f"üìÅ Available files in {SCRIPTS_DIR}:")
    
    if not os.path.exists(SCRIPTS_DIR):
        print(f"‚ùå Directory {SCRIPTS_DIR} does not exist")
        return []
    
    files = os.listdir(SCRIPTS_DIR)
    training_scripts = []
    
    for file in files:
        if file.endswith('.py') and 'training' in file.lower():
            training_scripts.append(file)
    
    print("ü§ñ Training scripts:")
    for script in sorted(training_scripts):
        print(f"   - {script}")
    
    return training_scripts

def quick_fix_imports():
    """Quick fix for any remaining preprocessor imports"""
    print("üîß Checking for any remaining preprocessor imports...")
    
    scripts_to_fix = [
        'model_training_LR.py',
        'model_training_XG.py',
        'model_training_RF.py'
    ]
    
    fixed_count = 0
    for script_file in scripts_to_fix:
        script_path = os.path.join(SCRIPTS_DIR, script_file)
        if os.path.exists(script_path):
            with open(script_path, 'r') as f:
                content = f.read()
            
            # Check for problematic imports
            problematic_patterns = [
                'from model_preprocessor import',
                'from model_preprocessor import'
            ]
            
            has_problem = False
            for pattern in problematic_patterns:
                if pattern in content:
                    print(f"‚ö†Ô∏è  Found problematic import in {script_file}: {pattern}")
                    has_problem = True
            
            if has_problem:
                print(f"üí° {script_file} still has external preprocessor imports")
                fixed_count += 1
    
    if fixed_count > 0:
        print(f"\nüîß {fixed_count} scripts still need manual fixing")
        print("üí° Make sure each script has embedded preprocessing functions")
    else:
        print("‚úÖ No problematic imports found")

if __name__ == "__main__":
    print("üöÄ Loan Default Prediction - Training Script Tests")
    print(f"üìÅ Scripts location: {SCRIPTS_DIR}")
    print("=" * 60)
    
    # First, list available scripts
    print("\n1Ô∏è‚É£ Listing available scripts...")
    training_scripts = list_available_scripts()
    
    if not training_scripts:
        print("‚ùå No training scripts found. Exiting.")
        exit(1)
    
    # Check for any remaining import issues
    print("\n2Ô∏è‚É£ Checking for import issues...")
    quick_fix_imports()
    
    # Verify embedded preprocessing
    print("\n3Ô∏è‚É£ Verifying embedded preprocessing...")
    preprocessing_ok = verify_embedded_preprocessing()
    
    # Check syntax
    print("\n4Ô∏è‚É£ Checking script syntax...")
    syntax_ok = check_script_syntax()
    
    # Run tests only if everything looks good
    if preprocessing_ok and syntax_ok:
        print("\n5Ô∏è‚É£ Running tests...")
        results = test_training_scripts()
    else:
        print("\n‚ùå Skipping tests due to script issues")
        print("üí° Please fix the issues above before running tests")
        results = {}
    
    print("\nüéâ Testing complete!")

üöÄ Loan Default Prediction - Training Script Tests
üìÅ Scripts location: /app/utils

1Ô∏è‚É£ Listing available scripts...
üìÅ Available files in /app/utils:
ü§ñ Training scripts:
   - model_training_LR.py
   - model_training_RF.py
   - model_training_XG.py

2Ô∏è‚É£ Checking for import issues...
üîß Checking for any remaining preprocessor imports...
‚úÖ No problematic imports found

3Ô∏è‚É£ Verifying embedded preprocessing...
üîç Verifying embedded preprocessing functions...
‚úÖ Logistic Regression has embedded preprocess_features_for_lr()
   ‚úÖ Has from sklearn.preprocessing import
   ‚úÖ Has from sklearn.compose import ColumnTransformer
‚úÖ XGBoost has embedded preprocess_features_for_tree()
   ‚úÖ Has from sklearn.preprocessing import
   ‚úÖ Has from sklearn.compose import ColumnTransformer
‚úÖ Random Forest has embedded preprocess_features_for_tree()
   ‚úÖ Has from sklearn.preprocessing import
   ‚úÖ Has from sklearn.compose import ColumnTransformer

4Ô∏è‚É£ Checking script

## 3. Checking Gold Store data

In [4]:
from pyspark.sql import SparkSession
from datetime import datetime, timedelta

# Start Spark
spark = SparkSession.builder \
    .appName("data_debug") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Check what files exist
features_path = '/app/datamart/gold/feature_store/'
labels_path = '/app/datamart/gold/label_store/'

print("üîç Checking data availability:")
print(f"Features path: {features_path}")
print(f"Labels path: {labels_path}")

import os
if os.path.exists(features_path):
    print(f"‚úÖ Features directory exists")
    files = os.listdir(features_path)
    print(f"   Files: {files[:5]}...")  # Show first 5 files
else:
    print(f"‚ùå Features directory does not exist")

if os.path.exists(labels_path):
    print(f"‚úÖ Labels directory exists")
    files = os.listdir(labels_path)
    print(f"   Files: {files[:5]}...")
else:
    print(f"‚ùå Labels directory does not exist")

# Try to read a small sample of data
try:
    print("\nüìä Trying to read features...")
    features_sdf = spark.read.parquet(features_path)
    print(f"Features schema: {features_sdf.columns}")
    print(f"Features count: {features_sdf.count()}")
    
    # Show sample data
    if features_sdf.count() > 0:
        print("Sample features:")
        features_sdf.limit(5).show()
    else:
        print("‚ùå No features data found")
        
except Exception as e:
    print(f"‚ùå Error reading features: {e}")

try:
    print("\nüìä Trying to read labels...")
    labels_sdf = spark.read.parquet(labels_path)
    print(f"Labels schema: {labels_sdf.columns}")
    print(f"Labels count: {labels_sdf.count()}")
    
    # Show sample data
    if labels_sdf.count() > 0:
        print("Sample labels:")
        labels_sdf.limit(5).show()
    else:
        print("‚ùå No labels data found")
        
except Exception as e:
    print(f"‚ùå Error reading labels: {e}")

# Check date ranges if data exists
try:
    if features_sdf.count() > 0 and 'snapshot_date' in features_sdf.columns:
        print("\nüìÖ Date ranges in features:")
        features_sdf.select("snapshot_date").distinct().orderBy("snapshot_date").show()
        
    if labels_sdf.count() > 0 and 'snapshot_date' in labels_sdf.columns:
        print("üìÖ Date ranges in labels:")
        labels_sdf.select("snapshot_date").distinct().orderBy("snapshot_date").show()
        
except Exception as e:
    print(f"‚ùå Error checking dates: {e}")

spark.stop()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/09 06:24:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


üîç Checking data availability:
Features path: /app/datamart/gold/feature_store/
Labels path: /app/datamart/gold/label_store/
‚úÖ Features directory exists
   Files: ['.part-00000-28a5c4b2-198b-4954-8889-e2021b9a951b-c000.snappy.parquet.crc', '._SUCCESS.crc', 'part-00000-28a5c4b2-198b-4954-8889-e2021b9a951b-c000.snappy.parquet', '_SUCCESS']...
‚úÖ Labels directory exists
   Files: ['.part-00000-6deaf5e4-8c74-40b8-ae60-6f29e7bbf25d-c000.snappy.parquet.crc', '._SUCCESS.crc', 'part-00000-6deaf5e4-8c74-40b8-ae60-6f29e7bbf25d-c000.snappy.parquet', '_SUCCESS']...

üìä Trying to read features...


                                                                                

Features schema: ['Customer_ID', 'snapshot_date', 'feature_snapshot_date', 'Age', 'Occupation', 'Delay_from_due_date', 'Outstanding_Debt', 'Amount_invested_monthly', 'Interest_Rate', 'Num_Bank_Accounts', 'Num_Credit_Card', 'Loan_Type_Home_Loan', 'Loan_Type_Personal_Loan', 'Loan_Type_Student_Loan', 'Loan_Type_Auto_Loan', 'Loan_Type_Business_Loan', 'Loan_Type_Credit-Builder_Loan', 'Loan_Type_Home_Equity_Loan', 'Loan_Type_Debt_Consolidation_Loan', 'Loan_Type_Mortgage_Loan', 'Loan_Type_Not_Specified', 'Loan_Type_Payday_Loan', 'Loan_amt_sum', 'Loan_amt_mean', 'Loan_amt_std', 'Loan_tenure_mean', 'Loan_tenure_max', 'Loan_overdue_amt_sum', 'Loan_overdue_amt_mean', 'Loan_overdue_amt_max', 'Loan_balance_sum', 'Loan_balance_mean', 'dpd_mean', 'dpd_max', 'loan_count', 'clickstream_total_events', 'clickstream_fe_5_mean', 'clickstream_fe_5_sum', 'clickstream_fe_5_std', 'clickstream_fe_9_mean', 'clickstream_fe_9_min', 'clickstream_fe_4_mean', 'clickstream_fe_4_min', 'clickstream_fe_10_mean', 'clickst

                                                                                

Features count: 5531
Sample features:


25/11/09 06:25:05 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-----------+-------------+---------------------+---+----------+-------------------+----------------+-----------------------+-------------+-----------------+---------------+-------------------+-----------------------+----------------------+-------------------+-----------------------+-----------------------------+--------------------------+---------------------------------+-----------------------+-----------------------+---------------------+------------+-------------+------------+----------------+---------------+--------------------+---------------------+--------------------+----------------+-----------------+--------+-------+----------+------------------------+---------------------+--------------------+--------------------+---------------------+--------------------+---------------------+--------------------+----------------------+---------------------+
|Customer_ID|snapshot_date|feature_snapshot_date|Age|Occupation|Delay_from_due_date|Outstanding_Debt|Amount_invested_monthly|Interest_

In [8]:
# Create a diagnostic script to check the gold layer data

import sys
import os
sys.path.append('/app')

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta
import argparse

def check_gold_data_quality():
    spark = SparkSession.builder 
        .appName("GoldDataDiagnostic") 
        .config("spark.sql.adaptive.enabled", "true") 
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") 
        .getOrCreate()
    
    # Check feature store
    features_path = "/app/datamart/gold/feature_store/"
    labels_path = "/app/datamart/gold/label_store/"
    
    print("üîç Checking Gold Layer Data Quality...")
    
    try:
        # Check if feature store exists and has data
        print(f"üìÅ Checking feature store: {features_path}")
        feature_files = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
            spark._jsc.hadoopConfiguration()
        ).listStatus(spark._jvm.org.apache.hadoop.fs.Path(features_path))
        
        if feature_files:
            print("‚úÖ Feature store exists")
            # Try to read a sample of features
            features_df = spark.read.parquet(features_path).limit(10)
            print(f"üìä Features sample count: {features_df.count()}")
            print("üìã Features schema:")
            features_df.printSchema()
            print("üî¢ Features sample data:")
            features_df.show(5, truncate=False)
            
            # Check for nulls and data issues
            print("‚ùì Checking for data issues in features:")
            for col_name in features_df.columns:
                null_count = features_df.filter(F.col(col_name).isNull()).count()
                print(f"   {col_name}: {null_count} nulls")
                
        else:
            print("‚ùå Feature store is empty or doesn't exist")
            
    except Exception as e:
        print(f"‚ùå Error reading feature store: {e}")
    
    try:
        # Check if label store exists and has data
        print(f"\\nüìÅ Checking label store: {labels_path}")
        label_files = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
            spark._jsc.hadoopConfiguration()
        ).listStatus(spark._jvm.org.apache.hadoop.fs.Path(labels_path))
        
        if label_files:
            print("‚úÖ Label store exists")
            # Try to read a sample of labels
            labels_df = spark.read.parquet(labels_path).limit(10)
            print(f"üìä Labels sample count: {labels_df.count()}")
            print("üìã Labels schema:")
            labels_df.printSchema()
            print("üî¢ Labels sample data:")
            labels_df.show(5, truncate=False)
            
            # Check label distribution
            if "default_flag" in labels_df.columns:
                label_dist = labels_df.groupBy("default_flag").count().collect()
                print("üìà Label distribution:")
                for row in label_dist:
                    print(f"   default_flag {row['default_flag']}: {row['count']} records")
            else:
                print("‚ùå 'default_flag' column not found in labels")
                
        else:
            print("‚ùå Label store is empty or doesn't exist")
            
    except Exception as e:
        print(f"‚ùå Error reading label store: {e}")
    
    # Check specific date range mentioned in the training
    train_date = "2025-09-10"
    print(f"\\nüìÖ Checking data for training date: {train_date}")
    
    try:
        # Check if we have features for the required time periods
        features_full = spark.read.parquet(features_path)
        print(f"üìä Total features records: {features_full.count()}")
        
        # Check date range
        if "snapshot_date" in features_full.columns:
            date_range = features_full.agg(
                F.min("snapshot_date").alias("min_date"),
                F.max("snapshot_date").alias("max_date")
            ).collect()[0]
            print(f"üìÖ Features date range: {date_range['min_date']} to {date_range['max_date']}")
        
    except Exception as e:
        print(f"‚ùå Error analyzing features: {e}")
    
    spark.stop()

if __name__ == "__main__":
    check_gold_data_quality()


# Save and run the diagnostic script
with open('/app/utils/check_gold_data.py', 'w') as f:
    f.write(diagnostic_script)

print("üîç Running gold data diagnostic...")

IndentationError: unexpected indent (706167350.py, line 15)