*This Python script automates the extraction, cleaning, and consolidation of the extensive Freddie Mac Single-Family Loan-Level Dataset, a standard and widely-used resource in mortgage-backed securities analysis. Designed for computational efficiency, the code processes a decade of quarterly data (2014-2024) by systematically reading origination and performance files.Through a robust process of deduplication and merging, the script constructs a reliable panel dataset that serves as a critical input for forecasting loan performance and measuring portfolio risk.*

### **Origination Data**

#### Within a Quarter:
The origination file for each quarter should contain unique Loan Sequence Number values within that quarter, as it typically records new loans originated during that period. 

#### Across Quarters:
Origination data across quarters (e.g., Q1, Q2, Q3, Q4) is not necessarily unique. A loan originated in Q1 might appear again in Q2, Q3, or Q4 if:
 - The loan’s details were updated (e.g., due to corrections or refinancing).
 - The dataset includes historical origination data re-reported in later quarters for context.

### **Performance Data**

#### Within a Quarter:
The performance file contains multiple entries for the same Loan Sequence Number within a quarter, reflecting monthly performance updates. For example, a loan might have records for Monthly Reporting Period values like 20240101 (January), 20240201 (February), and 20240301 (March) in Q1 2024. 

#### Across Quarters:
Performance data spans multiple quarters, so a loan active in Q1, Q2, Q3, and Q4 will have a latest performance record in each quarter’s file (e.g., the last month of Q1, Q2, Q3, and Q4). This results in multiple records per loan ID across quarters unless deduplicated globally. For instance, a loan might have its latest Q1 record in March 2024, its latest Q2 record in June 2024, and so on, leading to up to 4 records (one per quarter) if no further consolidation occurs.

### **Relationship Between Origination and Performance Data**

#### Loan IDs in Performance vs. Origination: 
The performance data for a given quarter can include Loan Sequence Number values that are more than those in the origination data for the same quarter. This happens because:

Performance data tracks all active loans, including those originated in previous quarters. For example, performance_2024Q2 includes loans originated in Q1 and Q2, while origination_2024Q2 only includes new loans from Q2.

The inner merge (pd.merge(..., how='inner')) ensures that only loans with both origination and performance data are kept, but the performance data’s broader scope means some loan IDs might not match if origination data is missing or incomplete for earlier quarters.

### Data Extraction Code Approach

- Process data quarterly across all years (2014-2024).
- For origination data, keep the first instance of each Loan Sequence Number globally across all years and quarters.
- For performance data, keep the last instance of each Loan Sequence Number globally across all years and quarters.
- Use an inner join to merge origination and performance data.
- Ensure computational efficiency given the large dataset size and quarterly separation.

In [None]:
import pandas as pd
import numpy as np
import os
import re
from pathlib import Path

In [None]:
# Directory containing the data folders and Excel layout file
data_dir = '/Users/dr/Documents/GitHub/MBS_RiskManagement/data/'
layout_file = '/Users/dr/Documents/GitHub/MBS_RiskManagement/READ_ME/SF LLD File Layout Release 44.xlsx'

In [None]:
# Function to parse sheet content from Excel
def parse_sheet_from_excel(sheet_name):
    df = pd.read_excel(layout_file, sheet_name=sheet_name, header=None)
    columns = []
    dtypes = {}
    start_row = df.index[df[0].str.contains('FIELD POSITION', na=False)].tolist()
    if start_row:
        start_row = start_row[0] + 1
    else:
        start_row = 0
    
    for index, row in df.iloc[start_row:].iterrows():
        if pd.isna(row[0]) or not isinstance(row[1], str):
            break
        attribute_name = row[1].strip()
        data_type = row[2].strip() if pd.notna(row[2]) else 'object'
        columns.append(attribute_name)
        if 'Alpha' in data_type or 'Alpha Numeric' in data_type or '- PYYQnXXXXXXX' in data_type:
            dtypes[attribute_name] = 'object'
        elif 'Numeric' in data_type and not ' - ' in data_type:
            dtypes[attribute_name] = 'Int64'
        elif 'Numeric - ' in data_type:
            dtypes[attribute_name] = 'float64'
        elif 'Date' in data_type:
            dtypes[attribute_name] = 'datetime64[ns]'
        else:
            dtypes[attribute_name] = 'object'
    print(f"{sheet_name} columns: {columns}")
    print(f"{sheet_name} dtypes: {dtypes}")
    return columns, dtypes

In [None]:
# Parse Origination and Performance sheets from Excel
origination_columns, origination_dtypes = parse_sheet_from_excel('Origination Data File')
performance_columns, performance_dtypes = parse_sheet_from_excel('Monthly Performance Data File')

In [None]:
# Create the extracted_data folder 
extracted_data_dir = os.path.join('/Users/dr/Documents/GitHub/MBS_RiskManagement/', 'extracted_data') 
os.makedirs(extracted_data_dir, exist_ok=True)

In [None]:
from datetime import datetime

def parse_yyyymm(val):
    """Convert YYYYMM or YYYYMMDD strings into datetime."""
    if pd.isna(val):
        return pd.NaT
    val = str(val).strip()
    if len(val) == 6:  # YYYYMM
        return datetime.strptime(val, "%Y%m")
    elif len(val) == 8:  # YYYYMMDD
        return datetime.strptime(val, "%Y%m%d")
    else:
        return pd.to_datetime(val, errors="coerce")  # fallback


In [None]:
# Years and quarters to process 
years = range(2014, 2025) # 2014 to 2024 inclusive 
quarters = ['Q1', 'Q2', 'Q3', 'Q4']

In [None]:
# Function to check available disk space 
def check_disk_space(path, required_mb=100): 
    stat = os.statvfs(path) 
    free_mb = (stat.f_bavail * stat.f_frsize) / (1024 * 1024) 
    if free_mb < required_mb: 
        print(f"Warning: Only {free_mb:.1f} MB free on {path}. Need at least {required_mb} MB. Free up space or adjust output.") 
        return False 
    return True

In [None]:
# Global dictionaries to track first origination and last performance 
origination_first = {} 
performance_last = {}

In [None]:
# Process each year and quarter
for year in years:
    for quarter in quarters:
        folder_path = os.path.join(data_dir, f'historical_data_{year}{quarter}')
        
        if os.path.exists(folder_path) and os.path.isdir(folder_path):
            print(f"Processing folder: {folder_path}")
            
            # Check disk space before processing
            if not check_disk_space(extracted_data_dir, required_mb=500):
                break
            
            # ------------------------
            # Process origination data
            # ------------------------
            origination_file_pattern = f'historical_data_{year}{quarter}.txt'
            origination_file = os.path.join(folder_path, origination_file_pattern)

            if os.path.exists(origination_file):
                chunk_iterator = pd.read_csv(
                    origination_file,
                    sep='|',
                    header=None,
                    names=origination_columns,
                    encoding='utf-8',
                    dtype={col: 'object' for col in origination_columns},  # read everything as string first
                    chunksize=100000,
                    low_memory=False
                )

                for chunk in chunk_iterator:
                    # Parse origination date columns
                    for col in ['First Payment Date', 'Maturity Date']:
                        if col in chunk.columns:
                            chunk[col] = chunk[col].apply(parse_yyyymm)

                    # Apply other dtypes
                    for col, dtype in {k: v for k, v in origination_dtypes.items()
                                       if k not in ['First Payment Date', 'Maturity Date']}.items():
                        if col in chunk.columns:
                            chunk[col] = chunk[col].astype(dtype, errors='ignore')

                    # Keep FIRST occurrence per Loan ID (global)
                    chunk = chunk.sort_values(['Loan Sequence Number', 'First Payment Date'])
                    first_rows = chunk.groupby('Loan Sequence Number').head(1)
                    for loan_id, row in first_rows.set_index('Loan Sequence Number').to_dict('index').items():
                        if loan_id not in origination_first:
                            origination_first[loan_id] = row
            else:
                print(f"Origination file not found: {origination_file_pattern}")

            
            # ------------------------
            # Process performance data
            # ------------------------
            performance_file_pattern = f'historical_data_time_{year}{quarter}.txt'
            performance_file = os.path.join(folder_path, performance_file_pattern)

            if os.path.exists(performance_file):
                chunk_iterator = pd.read_csv(
                    performance_file,
                    sep='|',
                    header=None,
                    names=performance_columns,
                    encoding='utf-8',
                    dtype={col: 'object' for col in performance_columns},  # read everything as string first
                    chunksize=100000,
                    low_memory=False
                )
        
                for chunk in chunk_iterator:
                    # Parse performance date columns
                    for col in ['Monthly Reporting Period', 'Defect Settlement Date', 
                                'Zero Balance Effective Date', 'Due Date of Last Paid Installment (DDLPI)']:
                        if col in chunk.columns:
                            chunk[col] = chunk[col].apply(parse_yyyymm)

                    # Apply other dtypes
                    for col, dtype in {k: v for k, v in performance_dtypes.items()
                                       if col not in ['Monthly Reporting Period', 'Defect Settlement Date',
                                                      'Zero Balance Effective Date', 'Due Date of Last Paid Installment (DDLPI)']}.items():
                        if col in chunk.columns:
                            chunk[col] = chunk[col].astype(dtype, errors='ignore')

                    # Keep LAST occurrence per Loan ID (global)
                    chunk = chunk.sort_values(['Loan Sequence Number', 'Monthly Reporting Period'])
                    last_rows = chunk.groupby('Loan Sequence Number').tail(1)
                    performance_last.update(last_rows.set_index('Loan Sequence Number').to_dict('index'))
            else:
                print(f"Performance file not found: {performance_file_pattern}")


In [None]:
# Convert global dictionaries to DataFrames
df_origination = pd.DataFrame.from_dict(origination_first, orient='index')
df_performance = pd.DataFrame.from_dict(performance_last, orient='index')

In [None]:
# Inner join on Loan Sequence Number
merged = df_origination.merge(df_performance, 
                              left_index=True, 
                              right_index=True, 
                              how='inner', 
                              suffixes=('_orig', '_perf'))


In [None]:

# Save final merged dataset
output_file = os.path.join(extracted_data_dir, "merged_loans_2014_2024.csv")
merged.to_csv(output_file, index=True)

print(f"Final merged dataset written to: {output_file}")
print(f"Total loans in merged set: {len(merged)}")