In [None]:
%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
         }
}

In [None]:
sc.install_pypi_package("openpyxl==3.1.3")

## 0. Imports and Initial Setup

In [None]:
from io import BytesIO
import boto3
import os
import sys
from zipfile import ZipFile
import openpyxl

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, when, to_date, max as spark_max, row_number, lit
from pyspark.sql import Window
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, IntegerType

print("INFO: Imports complete.")

## 1. Configuration Variables

In [None]:
raw_zip_path     = "s3://credit-risk-project-data/zipped/*.zip"
output_parquet   = "s3://credit-risk-project-data/processed/train_dataset.parquet"
glossary_s3_path = "s3://credit-risk-project-data/metadata/glossary.xlsx"
bucket_name      = "credit-risk-project-data"
glossary_s3_key  = "metadata/glossary.xlsx"   
local_glossary_path = "glossary.xlsx"         

print("INFO: Configuration variables set.")

In [None]:
if not os.path.exists(local_glossary_path):
    print(f"INFO: Downloading glossary from S3: s3://{bucket_name}/{glossary_s3_key} to {local_glossary_path}")
    s3 = boto3.client("s3")
    try:
        s3.download_file(bucket_name, glossary_s3_key, local_glossary_path)
        print("INFO: Glossary downloaded successfully.")
    except Exception as e:
        print(f"CRITICAL ERROR: Failed to download glossary from S3. Error: {e}")
        sys.exit()
else:
    print(f"INFO: Glossary file '{local_glossary_path}' already exists locally.")

## 2. Glossary Parsing and Schema Definitions

In [None]:
column_names = []
cast_types = {}
parse_info = {}
schema = StructType([])

In [None]:
def parse_glossary(glossary_file_path):
    loc_column_names = []
    loc_cast_types_dict = {}
    loc_parsing_info_dict = {}
    
    try:
        workbook = openpyxl.load_workbook(glossary_file_path, data_only=True)
        sheet = workbook.active
            
        if sheet:
            for row_idx, row in enumerate(sheet.iter_rows(min_row=2)): # Start from the second row
                if row_idx == 0 and all(cell.value is None for cell in row): # Skip if first data row is entirely blank
                    print("INFO (parse_glossary): Skipping blank first row in glossary.")
                    continue

                field = row[1].value   # Column Name
                dtype = row[9].value   # Data Type
                    
                if not field:
                    print(f"DEBUG (parse_glossary): Skipping row '{row_idx+2}'  empty field name.")
                    continue
                
                clean_field_name = str(field).strip()
                loc_column_names.append(clean_field_name)
                    
                post_cast_type = StringType() # Default post-cast type
                    
                if dtype and "DATE" in str(dtype).upper():
                    post_cast_type = TimestampType()
                    loc_parsing_info_dict[clean_field_name] = "MMyyyy"
                elif dtype and "ALPHA-NUMERIC" in str(dtype).upper():
                    post_cast_type = StringType()
                elif dtype and "ALPHA" in str(dtype).upper():
                    post_cast_type = StringType()
                elif dtype and "NUMERIC" in str(dtype).upper():
                    post_cast_type = DoubleType()
                        
                loc_cast_types_dict[clean_field_name] = post_cast_type
        else:
            print("WARNING (parse_glossary): Glossary sheet is not active or not found.")
    except Exception as e:
        print(f"ERROR (parse_glossary): Failed to parse glossary. Error: {e}")
    return loc_column_names, loc_cast_types_dict, loc_parsing_info_dict

In [None]:
column_names, cast_types, parse_info = parse_glossary(local_glossary_path)

In [None]:
# Checking columns and building schema
if column_names:
    print(f"INFO: Successfully parsed {len(column_names)} column names from glossary.")
    print(f"INFO: column names: {column_names}")
    
    # Define PySpark schema based on column_names
    try:
        schema_fields = [StructField(c_name, StringType(), True) for c_name in column_names]
        schema = StructType(schema_fields)
        print("\nINFO: PySpark schema defined successfully based on glossary.")
        print("INFO: Schema structure:")
        for field in schema.fields:
            print(f"  Name: {field.name}, Type: {field.dataType}, Nullable: {field.nullable}")
    except Exception as e:
        print(f"ERROR: Could not create PySpark schema from column names. Error: {e}")
        column_names = [] # Invalidate column_names if schema creation fails
        schema = StructType([])
else:
    print("CRITICAL WARNING: Glossary parsing resulted in an empty list of column names. DataFrame creation will likely fail or be incorrect.")
    schema = StructType([]) # Ensure schema is empty if column_names is empty

## 3. SparkSession and SparkContext

In [None]:
sc = spark.sparkContext
print(f"INFO: SparkSession and SparkContext retrieved. Spark version: {sc.version}")

## 4. Helper Functions for Data Extraction and Parsing

In [None]:
def extract_rows(pds_bytes, source_zip_filename="Unknown.zip"):
    """
    pds_bytes: pure bytes (the content of a .zip file)
    source_zip_filename: filename for logging
    Yields each CSV row as a list of strings.
    """
    buf = BytesIO(pds_bytes)
    processed_any_csv = False
    
    try:
        with ZipFile(buf) as zf:
            csv_files_in_zip = [name for name in zf.namelist() if name.lower().endswith(".csv")]
            if not csv_files_in_zip:
                print(f"WARNING (extract_rows): No CSV files found in zip: {source_zip_filename}")
                return

            for name in csv_files_in_zip:
                print(f"INFO (extract_rows): Processing CSV '{name}' from zip '{source_zip_filename}'")
                with zf.open(name) as fh:
                    line_count = 0
                    empty_line_count = 0
                    for raw_line_bytes in fh:
                        try:
                            line_str = raw_line_bytes.decode("ISO-8859-1").rstrip()
                            if not line_str.strip():
                                empty_line_count += 1
                                continue
                            
                            parts = line_str.split("|")
                            line_count += 1
                            yield parts
                            
                            
                        except UnicodeDecodeError as ude:
                            print(f"WARNING (extract_rows): UnicodeDecodeError in {name} from {source_zip_filename}, line approx {line_count+1}.")
                        except Exception as line_e:
                            print(f"WARNING (extract_rows): Error processing line {line_count+1} in {name} from {source_zip_filename}.")
                    
                    if line_count == 0 and empty_line_count > 0:
                        print(f"INFO (extract_rows): CSV '{name}' in '{source_zip_filename}' contained only {empty_line_count} empty/blank lines.")
                    elif line_count == 0:
                        print(f"WARNING (extract_rows): CSV '{name}' in '{source_zip_filename}' was empty or yielded no data lines.")
                    else:
                        print(f"INFO (extract_rows): Processed {line_count} data lines (skipped {empty_line_count} empty lines) from '{name}'.")
                    processed_any_csv = True
        if not processed_any_csv and csv_files_in_zip:
             print(f"WARNING (extract_rows): Found CSVs in {source_zip_filename} but none seemed to be processed.")

    except Exception as e:
        error_type = type(e).__name__
        print(f"CRITICAL WARNING (extract_rows): Could not process zip content from '{source_zip_filename}'. Error Type: {error_type}, Error: {e}.")
        pass


def robust_read_and_extract(path_pds_tuple):
    """
    Safely gets bytes from pds_input (PortableDataStream or already bytes)
    and then passes it to extract_rows.
    """
    path, pds_input = path_pds_tuple
    zip_filename = os.path.basename(path)
    file_content_bytes = None

    if hasattr(pds_input, 'read') and callable(getattr(pds_input, 'read')):
        try:
            file_content_bytes = pds_input.read()
            print(f"INFO (robust_read_and_extract): Successfully read {len(file_content_bytes) if file_content_bytes else 0} bytes for '{zip_filename}'.")
        except Exception as e:
            print(f"ERROR (robust_read_and_extract): Failed to .read() from PDS object for '{zip_filename}'. Error: {e}. Skipping this item.")
            return []
    elif isinstance(pds_input, bytes):
        print(f"INFO (robust_read_and_extract): Input for '{zip_filename}' is already bytes. Length: {len(pds_input)}.")
        file_content_bytes = pds_input
    else:
        print(f"ERROR (robust_read_and_extract): Unexpected type '{type(pds_input)}' for input associated with '{zip_filename}'. Skipping this item.")
        return []

    if file_content_bytes is not None:
        if not file_content_bytes:
            print(f"WARNING (robust_read_and_extract): File content for '{zip_filename}' is empty after read/conversion. Skipping.")
            return []
        return extract_rows(file_content_bytes, source_zip_filename=zip_filename)
    
    print(f"WARNING (robust_read_and_extract): No file content bytes obtained for '{zip_filename}'. Returning empty.")
    return []


def map_to_row_safe(parts_list): 
    """
    parts_list: list of strings from split row
    """
    global column_names
    if not column_names: # safeguard
        print("CRITICAL (map_to_row_safe): column_names is empty. Cannot map to Row. Returning None.")
        return None

    if len(parts_list) == len(column_names):
        try:
            return Row(*parts_list)
        except Exception as e:
            print(f"ERROR (map_to_row_safe): Failed to create Row object. Parts: {str(parts_list)[:100]}. Error: {e}. Returning None.")
            return None
    else:
        print(f"WARNING (map_to_row_safe): Column count mismatch. Data has {len(parts_list)} parts, schema expects {len(column_names)}.")
        return None

print("INFO: Helper functions defined.")

## 5. Data Ingestion and RDD Processing

#### 5.1 - Read binary files

In [None]:
zips_rdd_with_paths = sc.binaryFiles(raw_zip_path)

initial_zip_files_count = zips_rdd_with_paths.count()
print(f"INFO: Stage 5.1 - Count of items from sc.binaryFiles (should be 20 files): {initial_zip_files_count}")

if initial_zip_files_count == 0:
    print("CRITICAL: No zip files are being read by sc.binaryFiles. Check 'raw_zip_path' and S3 permissions.")
    rows_rdd = sc.emptyRDD()
else:
    print(f"INFO: Stage 5.1 - Successfully found {initial_zip_files_count} zip file(s) to process.")
    if initial_zip_files_count > 0:
        try:
            print("INFO: Stage 5.1 - Checking types of first 3 PDS items (via map on executors):")
            collected_info = zips_rdd_with_paths.map(lambda path_pds_tuple: (path_pds_tuple[0], type(path_pds_tuple[1]).__name__)).take(3)
            for path, type_name in collected_info:
                print(f"  File Path: {path}, Stream Object Type (as seen by Python map): {type_name}")
        except Exception as e:
            print(f"WARNING: Stage 5.1 - Error during .take(3) for PDS type inspection: {e}")

#### 5.2 - flatMap to extract rows from CSVs within Zip Files

In [None]:
rows_rdd = zips_rdd_with_paths.flatMap(robust_read_and_extract) # flatMap expects an iterable from the function

count_rows_rdd = rows_rdd.count()
print(f"INFO: Stage 5.2 - Count of rows_rdd (raw string lists yielded from all CSVs via flatMap): {count_rows_rdd}")

if count_rows_rdd == 0:
    print("CRITICAL WARNING: rows_rdd is empty. This means robust_read_and_extract / extract_rows did not yield any CSV rows.")
    print("  Check executor logs for 'INFO' or 'WARNING' messages from these functions.")
else:
    print("INFO: Stage 5.2 - Sample data from rows_rdd (first 3 lists of strings):")
    for r_idx, r_val in enumerate(rows_rdd.take(3)):
        print(f"  Sample row {r_idx} (first 200 chars): {str(r_val)[:200]}...")

#### 5.3 - Map to Row objects and filter Nones

In [None]:
if 'rows_rdd' in locals() and count_rows_rdd > 0 and column_names and schema.fields:
    print("INFO: Stage 5.3 - Starting map to Row objects.")
    rows_rdd_mapped = rows_rdd.map(map_to_row_safe)
    
    total_mapped_count = rows_rdd_mapped.count()
    print(f"INFO: Stage 5.3 - Count of rows_rdd_mapped (before filtering None): {total_mapped_count}")
    
    rows_rdd_filtered = rows_rdd_mapped.filter(lambda r_obj: r_obj is not None)
    final_rdd_for_df_count = rows_rdd_filtered.count()
    
    none_count = total_mapped_count - final_rdd_for_df_count
    print(f"INFO: Stage 5.3 - Number of rows that became None in map_to_row_safe (and were filtered): {none_count}")
    print(f"INFO: Stage 5.3 - Count of rows_rdd_filtered (valid Row objects for DataFrame): {final_rdd_for_df_count}")
    
    if final_rdd_for_df_count > 0:
        print("INFO: Stage 5.3 - Sample data from rows_rdd_filtered (first 3 Row objects):")
        for r_obj_idx, r_obj_val in enumerate(rows_rdd_filtered.take(3)):
            print(f"  Sample Row object {r_obj_idx} (first 200 chars): {str(r_obj_val)[:200]}...")
    
elif not column_names or not schema.fields:
    print("CRITICAL WARNING: Stage 5.3 - Skipping map to Row objects because column_names or schema is empty/invalid.")
    final_rdd_for_df_count = 0 
    rows_rdd_filtered = sc.emptyRDD()
elif count_rows_rdd == 0 : 
     print("INFO: Stage 5.3 - Skipping map to Row objects as rows_rdd was empty.")
     final_rdd_for_df_count = 0
     rows_rdd_filtered = sc.emptyRDD()
else:
    print("CRITICAL ERROR: Stage 5.3 - rows_rdd not defined.")
    final_rdd_for_df_count = 0
    rows_rdd_filtered = sc.emptyRDD()

## 6. Create Raw DataFrame

In [None]:
if final_rdd_for_df_count > 0 and schema.fields:
    print(f"INFO: Stage 6 - Attempting to create df_raw from {final_rdd_for_df_count} rows using schema with {len(schema.fields)} fields.")
    try:
        df_raw = spark.createDataFrame(rows_rdd_filtered, schema)
        print(f"INFO: df_raw successfully created.")
    except Exception as e:
        print(f"CRITICAL ERROR: Stage 6 - Failed to create DataFrame. Error: {e}")
        df_raw = spark.createDataFrame([], schema if schema.fields else StructType([]))
else:
    warning_msg = "CRITICAL WARNING: Stage 6 - Cannot create df_raw because "
    if not final_rdd_for_df_count > 0: warning_msg += "the filtered RDD is empty. "
    if not schema.fields: warning_msg += "the schema has no fields. "
    print(warning_msg)
    df_raw = spark.createDataFrame([], schema if schema.fields else StructType([]))

In [None]:
df_raw_count = df_raw.count()
print(f"INFO: Stage 6 - Count of df_raw: {df_raw_count}")

if df_raw_count > 0:
    print("INFO: Stage 6 - Schema of df_raw:")
    df_raw.printSchema()
    print("INFO: Stage 6 - Sample data from df_raw (top 5 rows):")
    df_raw.show(5, truncate=False)
else:
    print("INFO: Stage 6 - df_raw is empty. Check previous logs for reasons (e.g., file read errors, all rows filtered by map_to_row_safe).")

## 7. Initialize DataFrame 'df' for further processing

In [None]:
df = df_raw
print("df_raw assigned to df")

## 8. Type Casting and Feature Engineering

In [None]:
if df_raw_count > 0:
    # Cast each column according to cast_types
    temp_df = df
    for c_name, target_dtype in cast_types.items():
        if c_name in temp_df.columns:
            if isinstance(target_dtype, TimestampType):
                fmt = parse_info.get(c_name)
                if fmt:
                    print(f"INFO: Casting column '{c_name}' to TimestampType using format '{fmt}'.")
                    temp_df = temp_df.withColumn(c_name, to_date(col(c_name), fmt))
                else:
                    print(f"WARNING: No parse_info format for date column '{c_name}'. Cannot cast to_date without format. Column remains string.")
            else:
                print(f"INFO: Casting column '{c_name}' to {target_dtype}.")
                temp_df = temp_df.withColumn(c_name, col(c_name).cast(target_dtype))
        else:
            print(f"WARNING: Column '{c_name}' intended for casting not found in DataFrame. Available columns: {temp_df.columns}")
    df = temp_df

    # Flag for delinquency
    if "Current Loan Delinquency Status" in df.columns:
        print("INFO: Creating 'IsDelinquent' column.")
        df = df.withColumn(
            "IsDelinquent",
            when(col("Current Loan Delinquency Status") == "XX", 1)
            .when(col("Current Loan Delinquency Status").cast("int") >= 3, 1)
            .otherwise(0)
        )
    else:
        print("WARNING: 'Current Loan Delinquency Status' column not found. Cannot create 'IsDelinquent'. Adding 'IsDelinquent' as 0.")
        df = df.withColumn("IsDelinquent", lit(0)) # Add IsDelinquent as 0 if source column is missing

    if "Loan Age" in df.columns:
        print("INFO: Casting 'Loan Age' column to IntegerType.")
        df = df.withColumn("Loan Age", col("Loan Age").cast(IntegerType()))
    else:
        print("WARNING: 'Loan Age' column not found. Cannot cast.")
    print("INFO: Type casting and IsDelinquent creation stage complete.")
else:
    print("INFO: Skipping type casting and feature engineering as input DataFrame 'df' is empty.")

df.printSchema()
df.show(3, truncate=False)

## 9. Windowing and Feature/Label Assembly

#### 9.1 - Filter for SF Origination Features

In [None]:
if df_raw_count > 0: # Proceed only if df has data
    # Window to pick first row per loan
    if "Loan Identifier" in df.columns and "Monthly Reporting Period" in df.columns:
        w_first = Window.partitionBy("Loan Identifier").orderBy(col("Monthly Reporting Period").asc())
        print("INFO: Window 'w_first' defined.")

        # First-row features
        static_cols_original = [ 
            "Channel","Seller Name","Servicer Name","Original Interest Rate",
            "Original UPB","Original Loan Term","Original Loan to Value Ratio (LTV)",
            "Original Combined Loan to Value Ratio (CLTV)","Number of Borrowers",
            "Debt-To-Income (DTI)","Borrower Credit Score at Origination",
            "Co-Borrower Credit Score at Origination","First Time Home Buyer Indicator",
            "Loan Purpose","Property Type","Number of Units","Occupancy Status",
            "Property State","Metropolitan Statistical Area (MSA)","Zip Code Short",
            "Mortgage Insurance Percentage","Amortization Type",
            "Mortgage Insurance Type","Special Eligibility Program",
            "High Balance Loan Indicator","Origination Date"
        ]
        existing_static_cols = [c for c in static_cols_original if c in df.columns]
        missing_static_cols = [c for c in static_cols_original if c not in df.columns]
        if missing_static_cols:
            print(f"WARNING: These static columns for 'first_features' are not in df and will be excluded: {missing_static_cols}")
        
        if existing_static_cols:
            first_features = (
                df
                .withColumn("rn", row_number().over(w_first))
                .filter(col("rn") == 1)
                .select("Loan Identifier", *existing_static_cols)
            )
            print(f"INFO: 'first_features' created. Count: {first_features.count()}")
            first_features.show(3, truncate=False)
        else:
            print("WARNING: No existing columns found from the predefined list.")
            first_features = df.select("Loan Identifier").distinct()
            print(f"INFO: Fallback 'first_features' created with only Loan Identifier (distinct). Count: {first_features.count()}")

    else:
        print("WARNING: 'Loan Identifier' or 'Monthly Reporting Period' missing. Cannot define window or 'first_features'.")
else:
    print("INFO: Skipping feature/label assembly as input DataFrame 'df' is empty.")

#### 9.2 - Create Default-in-12 Months Label

In [None]:
if df_raw_count > 0:
    # 90 Day Delinquency (Default in first 12 Months)
    if "Loan Age" in df.columns and "Loan Identifier" in df.columns and "IsDelinquent" in df.columns:
        df_12 = df.filter(col("Loan Age") <= 12)
        label_df = (
            df_12.groupBy("Loan Identifier")
            .agg(spark_max("IsDelinquent").alias("Default_in_12M"))
        )
        print(f"INFO: 'label_df' created. Count: {label_df.count()}")
        label_df.show(3, truncate=False)
    else:
        print("WARNING: Columns needed for 'label_df' (Loan Age, Loan Identifier, IsDelinquent) are missing.")
else:
    print("INFO: Skipping feature/label assembly as input DataFrame 'df' is empty.")

#### 9.3 - Calculate Max Age

In [None]:
if df_raw_count > 0:
    if "Loan Identifier" in df.columns and "Loan Age" in df.columns:
        age_df = (
            df.groupBy("Loan Identifier")
            .agg(spark_max("Loan Age").alias("LoanAge_max"))
        )
        print(f"INFO: 'age_df' created. Count: {age_df.count()}")
        age_df.show(3, truncate=False)
        if age_df.count() > 0: age_df.select("LoanAge_max").describe().show()
    else:
        print("WARNING: Columns needed for 'age_df' (Loan Identifier, Loan Age) are missing.")
else:
    print("INFO: Skipping feature/label assembly as input DataFrame 'df' is empty.")

## 10. Final Join and Filter

In [None]:
print("--- Starting Final Join and Filter ---")
if df_raw_count > 0 and 'label_df' in locals() and 'first_features' in locals() and 'age_df' in locals():
    intermediate_join = (
        label_df
        .join(first_features, "Loan Identifier", "left")
        .join(age_df, "Loan Identifier", "left")
    )
    print(f"INFO: 'intermediate_join' created. Count: {intermediate_join.count()}")
    intermediate_join.show(5, truncate=False)
    
    joined = intermediate_join
    
    # Filter by LoanAge_max first
    if "LoanAge_max" in joined.columns:
        print("INFO: Describing 'LoanAge_max' in intermediate_join:")
        joined.select("LoanAge_max").describe().show()
        
        # Loans need to have at least 12 months of history
        joined = joined.filter(col("LoanAge_max") >= 12)
        print(f"INFO: Count of 'joined' DataFrame after LoanAge_max >= 12 filter: {joined.count()}")
    else:
        print("WARNING: 'LoanAge_max' column not found in joined. Skipping LoanAge_max filter.")
    
    # Apply origination date filter. Loans no earlier than Jan 2023
    if "Origination Date" in joined.columns:
        start_threshold_date = to_date(lit("012023"), "MMyyyy")
        joined = joined.filter(col("Origination Date") >= threshold_date)
        
        print(f"INFO: Count of final 'joined' DataFrame (after Origination Date >= {origination_date_threshold_str} filter): {joined.count()}")
    else:
        print("WARNING: 'Origination Date' column not found in 'joined' DataFrame. Cannot apply Origination Date filter.")

    print("INFO: Final join and filter stage complete.")
    if 'joined' in locals() and joined.count() > 0:
        joined.show(10, truncate=False)
        try:
            (joined.write
            .mode("overwrite")
            .parquet(output_parquet))
            
            print(f"SUCCESS: 'joined' DataFrame was successfully written to Parquet directory: {output_parquet}")
        except Exception as e:
            print(f"CRITICAL ERROR: Failed to write 'joined' DataFrame to Parquet. Error: {e}")
    elif 'joined' in locals() and joined.count() == 0:
        print("WARNING: Final 'joined' DataFrame is empty after all operations.")
        joined.show(10,truncate=False)
    else:
        print("ERROR: 'joined' DataFrame was not properly created or is unavailable.")

else:
    print("INFO: Skipping final join and filter as one or more input DataFrames (df, label_df, first_features, age_df) were empty or not created.")