In [1]:
import os
import pandas as pd

def clean_and_merge(orig_df, perf_df):
    # Standardize key for merging
    key = "Loan Sequence Number"
    orig_df[key] = orig_df[key].astype(str)
    perf_df[key] = perf_df[key].astype(str)

    # Merge (m:1), inner join
    merged = perf_df.merge(orig_df, on=key, how="inner", validate="m:1")

    # Snake‑case the column names
    merged.columns = (
        merged.columns
        .str.lower()
        .str.replace(r"[ \-\/()]+", "_", regex=True)
        .str.strip("_")
    )

    # Parse and sort by date
    if "monthly_reporting_period" in merged.columns:
        merged["monthly_reporting_period"] = pd.to_datetime(
            merged["monthly_reporting_period"], errors="coerce"
        )
    if "first_payment_date" in merged.columns:
        merged["first_payment_date"] = pd.to_datetime(
            merged["first_payment_date"], format="%Y%m", errors="coerce"
        )
    merged = merged.sort_values(
        ["loan_sequence_number", "monthly_reporting_period"]
    ).reset_index(drop=True)

    # Numeric casts
    for col in ("current_actual_upb", "original_upb", "interest_rate"):
        if col in merged.columns:
            merged[col] = pd.to_numeric(merged[col], errors="coerce")

    return merged

    




In [None]:
os.makedirs('../data/processed/merged_by_year', exist_ok=True)

for year in range(2010, 2025):
    o_path = f"../data/interim/origination_{year}.parquet"
    p_path = f"../data/interim/performance_{year}.parquet"
    if not os.path.exists(o_path):
        print(f"  Skipping origination_{year}.parquet (not found)")
        continue
    if not os.path.exists(p_path):
        print(f"  Skipping performance_{year}.parquet (not found)")
        continue

    print(f"Processing {year}...")
    orig = pd.read_parquet(o_path)
    perf = pd.read_parquet(p_path)

    merged = clean_and_merge(orig, perf)

    out_path = f"../data/processed/merged_by_year/merged_{year}.parquet"
    merged.to_parquet(out_path, index=False)
    print(f"  Saved merged_{year}.parquet ({len(merged):,} rows)")

Processing 1999...
  Saved merged_1999.parquet (2,507,096 rows)
Processing 2000...
  Saved merged_2000.parquet (1,439,897 rows)
Processing 2001...
  Saved merged_2001.parquet (1,960,552 rows)
Processing 2002...
  Saved merged_2002.parquet (2,488,606 rows)
Processing 2003...
  Saved merged_2003.parquet (4,040,135 rows)
Processing 2004...
  Saved merged_2004.parquet (3,959,723 rows)
Processing 2005...
  Saved merged_2005.parquet (3,856,796 rows)
Processing 2006...
  Saved merged_2006.parquet (3,184,782 rows)
Processing 2007...
  Saved merged_2007.parquet (2,989,909 rows)
Processing 2008...
  Saved merged_2008.parquet (2,437,525 rows)
Processing 2009...
  Saved merged_2009.parquet (3,122,523 rows)
Processing 2010...
  Saved merged_2010.parquet (3,390,711 rows)
Processing 2011...
  Saved merged_2011.parquet (3,548,320 rows)
Processing 2012...
  Saved merged_2012.parquet (4,352,734 rows)
Processing 2013...
  Saved merged_2013.parquet (4,027,063 rows)
Processing 2014...
  Saved merged_2014.p

In [3]:
import os, glob, re
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc  # for safe=False casting fallback

yearly_dir = "../data/processed/merged_by_year"
final_path = "../data/processed/merged_loan_performance_2010_2024.parquet"

# ---------- 1) Collect only 2010–2024 files ----------
pattern = os.path.join(yearly_dir, "merged_*.parquet")

def year_from_path(p: str):
    m = re.search(r"(\d{4})", os.path.basename(p))
    return int(m.group(1)) if m else None

all_files = sorted(glob.glob(pattern))
files = [p for p in all_files if (year_from_path(p) is not None and 2010 <= year_from_path(p) <= 2024)]

if not files:
    raise RuntimeError(f"No 2010–2024 files found in {yearly_dir}. Saw: {[os.path.basename(x) for x in all_files]}")

print("Will merge these files (2010–2024):")
for p in files:
    print(" -", os.path.basename(p))

# ---------- 2) Build master schema (start from first, then include any new columns that appear later) ----------
base_schema = pq.read_schema(files[0])

def sanitize_field(fld: pa.Field) -> pa.Field:
    # Convert NullType columns to string so they can be casted later
    t = fld.type
    if pa.types.is_null(t):
        t = pa.string()
    return pa.field(fld.name, t)

fields = [sanitize_field(f) for f in base_schema]
master_schema = pa.schema(fields)

# Add any columns that show up in later files but aren't in the base schema
for path in files[1:]:
    sch = pq.read_schema(path)
    for fld in sch:
        if fld.name not in master_schema.names:
            master_schema = master_schema.append(sanitize_field(fld))

print("\nMaster schema columns:", len(master_schema.names))

# ---------- 3) Open writer and append year-by-year ----------
os.makedirs(os.path.dirname(final_path), exist_ok=True)
writer = pq.ParquetWriter(final_path, master_schema, compression="snappy")

for path in files:
    print("Appending", os.path.basename(path))
    tbl = pq.read_table(path)

    # 3a) Ensure every master column exists; if missing, add a null column of the right type
    for fld in master_schema:
        if fld.name not in tbl.schema.names:
            tbl = tbl.append_column(fld.name, pa.nulls(tbl.num_rows, type=fld.type))

    # 3b) Reorder and cast columns to master types
    cols = []
    for fld in master_schema:
        col = tbl.column(fld.name)
        if not col.type.equals(fld.type):
            try:
                col = col.cast(fld.type)  # safe cast
            except pa.ArrowInvalid:
                # last resort: unsafe cast (e.g., int->string)
                col = pc.cast(col, target_type=fld.type, safe=False)
        cols.append(col)

    tbl2 = pa.Table.from_arrays(cols, names=master_schema.names)

    # 3c) Write this chunk (row_group_size helps keep memory predictable)
    writer.write_table(tbl2, row_group_size=1_000_000)

writer.close()
print("\nFinal merged file saved to", final_path)


Will merge these files (2010–2024):
 - merged_2010.parquet
 - merged_2011.parquet
 - merged_2012.parquet
 - merged_2013.parquet
 - merged_2014.parquet
 - merged_2015.parquet
 - merged_2016.parquet
 - merged_2017.parquet
 - merged_2018.parquet
 - merged_2019.parquet
 - merged_2020.parquet
 - merged_2021.parquet
 - merged_2022.parquet
 - merged_2023.parquet
 - merged_2024.parquet

Master schema columns: 63
Appending merged_2010.parquet
Appending merged_2011.parquet
Appending merged_2012.parquet
Appending merged_2013.parquet
Appending merged_2014.parquet
Appending merged_2015.parquet
Appending merged_2016.parquet
Appending merged_2017.parquet
Appending merged_2018.parquet
Appending merged_2019.parquet
Appending merged_2020.parquet
Appending merged_2021.parquet
Appending merged_2022.parquet
Appending merged_2023.parquet
Appending merged_2024.parquet

Final merged file saved to ../data/processed/merged_loan_performance_2010_2024.parquet
