<a href="https://colab.research.google.com/github/granum-tech/open_finance_library/blob/main/vintage_analysis/src/vintage_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [16]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
from google.colab import files

In [25]:
# Set variables
"""
period_type: 'monthly', 'quarterly', or 'yearly' vintages
calculation_type: 'sum' for cumulative net call-off sum, 'percent' for percentage of cumulative amount financed for period
output_file_path: Path to save the output Excel file. If None, will generate default name i.e. 'vintage_analysis_quarterly_percent.xlsx'
"""
period_type = 'yearly'
calculation_type = 'percentage'
output_file_path = None


In [18]:
# Upload the input file
uploaded = files.upload()

# Rename the uploaded file to 'input' with the correct extension
for original_filename in uploaded.keys():
    # Get the file extension
    _, file_extension = os.path.splitext(original_filename)
    # Define the new filename
    new_filename = 'input' + file_extension
    # Rename the file
    os.rename(original_filename, new_filename)
    # Set the file_path to the new filename
    file_path = new_filename
    print(f"File uploaded and renamed to: {new_filename}")

Saving input_csv.csv to input_csv.csv
File uploaded and renamed to: input.csv


In [19]:
def load_data(file_path):
    """
    Load data from a CSV or XLSX file into a DataFrame.

    Args:
        file_path (str): The path to the input file.

    Returns:
        pd.DataFrame: The loaded DataFrame.
    """
    try:
        if file_path.endswith('.csv'):
            df = pd.read_csv(file_path)
        elif file_path.endswith(('.xls', '.xlsx')):
            df = pd.read_excel(file_path)
        else:
            raise ValueError("Unsupported file format. Please provide a CSV or XLSX file.")
        return df
    except Exception as e:
        print(f"Error loading data: {e}")
        raise

In [27]:
def validate_data(df, calculation_type, period_type):
    """
    Perform basic data quality checks on the input DataFrame.
    Also validates the calculation_type and period_type parameters.

    Args:
        df (pd.DataFrame): DataFrame containing loan servicing data.
        calculation_type (str): Calculation type provided by the user.
        period_type (str): Period type provided by the user.

    Returns:
        df (pd.DataFrame): Validated DataFrame.
    """

    # Validate calculation_type
    valid_calculation_types = ['sum', 'percent']
    if calculation_type.lower() not in valid_calculation_types:
        raise ValueError(f"Invalid calculation_type: '{calculation_type}'. Expected one of {valid_calculation_types}")

    # Validate period_type
    valid_period_types = ['monthly', 'quarterly', 'yearly']
    if period_type.lower() not in valid_period_types:
        raise ValueError(f"Invalid period_type: '{period_type}'. Expected one of {valid_period_types}")

    df_copy = df.copy()

    # Check for required columns
    required_columns = ['loan_id', 'boarding_date', 'charge_off_date',
                       'net_call_off', 'original_amount_financed']
    missing_columns = set(required_columns) - set(df_copy.columns)
    if missing_columns:
        raise ValueError(f"Missing required columns: {missing_columns}")

    # Convert dates - keep original if conversion fails
    for date_col in ['boarding_date', 'charge_off_date']:
        try:
            df_copy[date_col] = pd.to_datetime(df_copy[date_col], errors='coerce')
        except Exception as e:
            print(f"Warning: Some {date_col} values could not be converted to dates")

    # Convert numeric columns - keep original if conversion fails
    for num_col in ['net_call_off', 'original_amount_financed']:
        try:
            df_copy[num_col] = pd.to_numeric(df_copy[num_col], errors='coerce')
        except Exception as e:
            print(f"Warning: Some {num_col} values could not be converted to numbers")

    # Print warnings for potential data quality issues
    issues_found = False

    # Check only for critical missing values (boarding_date and original_amount_financed)
    critical_missing = df_copy[['boarding_date', 'original_amount_financed']].isnull()
    if critical_missing.any().any():
        issues_found = True
        print("\nMissing required values:")
        missing_rows = df_copy[critical_missing.any(axis=1)]
        print("loan_id | missing values")
        print("-" * 30)
        for idx, row in missing_rows.iterrows():
            missing_cols = [col for col in ['boarding_date', 'original_amount_financed'] if pd.isnull(row[col])]
            print(f"{row['loan_id']} | {', '.join(missing_cols)}")

    # Check for charge-off dates before boarding dates
    invalid_dates = df_copy[df_copy['charge_off_date'] < df_copy['boarding_date']]
    if not invalid_dates.empty:
        issues_found = True
        print("\nLoans with charge-off date before boarding date:")
        print("loan_id | boarding_date | charge_off_date")
        print("-" * 50)
        for idx, row in invalid_dates.iterrows():
            print(f"{row['loan_id']} | {row['boarding_date'].date()} | {row['charge_off_date'].date()}")

    # Check for duplicate loan_ids
    duplicates = df_copy[df_copy['loan_id'].duplicated(keep=False)]
    if not duplicates.empty:
        issues_found = True
        print("\nDuplicate loan_id entries:")
        print("loan_id | boarding_date")
        print("-" * 30)
        for idx, row in duplicates.iterrows():
            print(f"{row['loan_id']} | {row['boarding_date'].date()}")

    # Check for negative values
    for col in ['original_amount_financed', 'net_call_off']:
        negative_values = df_copy[df_copy[col] < 0]
        if not negative_values.empty:
            issues_found = True
            print(f"\nNegative values found in {col}:")
            print(f"loan_id | {col}")
            print("-" * 30)
            for idx, row in negative_values.iterrows():
                print(f"{row['loan_id']} | {row[col]}")

    # Basic data fixes
    # Replace NaN or None to NaT to be handled as a blank time in pandas
    df_copy['charge_off_date'] = df_copy['charge_off_date'].fillna(pd.NaT)
    # Replace NaN or None with 0.0
    df_copy['net_call_off'] = df_copy['net_call_off'].fillna(0.0)

    if not issues_found:
        print("No major data quality issues found")

    return df_copy

In [21]:
def calculate_vintage_matrix(df, period_type='quarterly', calculation_type='percent', output_file_path=None):
    """
    Calculate the vintage matrix for cumulative net call-offs with flexible time period options.

    Args:
        df (pd.DataFrame): DataFrame containing loan servicing data
        period_type (str): Time period for analysis - 'monthly', 'quarterly', or 'yearly'
        calculation_type (str): 'sum' for cumulative net call-off sum, 'percent' for percentage of cumulative amount financed for period
        output_file_path (str): Path to save the output Excel file. If None, will generate default name

    Returns:
        vintage_matrix (pd.DataFrame): The calculated vintage matrix
    """
    # Validate and set period parameters
    period_settings = {
        'monthly': {'freq': 'M', 'name': 'month'},
        'quarterly': {'freq': 'Q', 'name': 'quarter'},
        'yearly': {'freq': 'Y', 'name': 'year'}
    }

    if period_type.lower() not in period_settings:
        raise ValueError("period_type must be 'monthly', 'quarterly', or 'yearly'")

    period_freq = period_settings[period_type.lower()]['freq']
    period_name = period_settings[period_type.lower()]['name']

    # Set default output file path if none provided
    if output_file_path is None:
        output_file_path = f'vintage_analysis_{period_type.lower()}_{calculation_type}.xlsx'

    # Ensure datetime format for dates
    df['boarding_date'] = pd.to_datetime(df['boarding_date'])
    df['charge_off_date'] = pd.to_datetime(df['charge_off_date'])

    # Add the period column based on boarding_date
    period_col = f'year{period_name}'
    df[period_col] = df['boarding_date'].dt.to_period(period_freq).astype(str)

    # Calculate the number of periods since vintage started
    def calculate_period_offset(row):
        if pd.isna(row['charge_off_date']):
            charge_off_period = pd.Timestamp.now().to_period(period_freq)
        else:
            charge_off_period = row['charge_off_date'].to_period(period_freq)
        boarding_period = row['boarding_date'].to_period(period_freq)
        return (charge_off_period - boarding_period).n

    vintage_col = f'vintage_{period_name}'
    df[vintage_col] = df.apply(calculate_period_offset, axis=1)

    # Replace any missing net_call_off values with 0
    df['net_call_off'] = df['net_call_off'].fillna(0)

    # Calculate the total original amount financed for each vintage
    origination_sum = df.groupby(period_col)['original_amount_financed'].sum()

    # Group by period and vintage to calculate cumulative net_call_off
    cumulative_data = (
        df.groupby([vintage_col, period_col])['net_call_off']
        .sum()
        .groupby(level=1)
        .cumsum()
        .reset_index()
    )

    if calculation_type == 'percent':
        cumulative_data['origination_sum'] = cumulative_data[period_col].map(origination_sum)
        cumulative_data['net_call_off'] = (
            cumulative_data['net_call_off'] / cumulative_data['origination_sum']
        ) * 100

    # Pivot to create the matrix
    vintage_matrix = cumulative_data.pivot(
        index=vintage_col, columns=period_col, values='net_call_off'
    )

    # Create a separate DataFrame for originations
    originations_df = pd.DataFrame(origination_sum).T.rename(index={0: 'originations'})

    # Get the maximum number of vintage periods
    max_vintage_periods = len(vintage_matrix.index)

    # Process each column to handle empty cells and value forwarding
    for col in vintage_matrix.columns:
        col_idx = vintage_matrix.columns.get_loc(col)
        limit = max_vintage_periods - col_idx  # Diagonal cutoff limit

        if limit > 0:
            # Get the column data
            column_data = vintage_matrix[col].copy()

            # Find the first non-null value
            first_valid_idx = column_data.first_valid_index()

            if first_valid_idx is not None:
                # Fill with 0s up to first valid value
                column_data.loc[:first_valid_idx] = column_data.loc[:first_valid_idx].fillna(0)
                # Forward fill the remaining values up to the limit
                column_data = column_data.iloc[:limit].fillna(method='ffill')
            else:
                # If no valid values, fill everything up to limit with 0
                column_data.iloc[:limit] = 0

            # Apply back to vintage matrix
            vintage_matrix[col].iloc[:limit] = column_data[:limit]

    # Combine originations with vintage matrix
    vintage_matrix = pd.concat([originations_df, vintage_matrix])

    # Save the final vintage matrix to an Excel file
    vintage_matrix.to_excel(output_file_path)
    print(f"{period_type.capitalize()} vintage matrix saved to {output_file_path}")

    return vintage_matrix

In [22]:
def plot_vintage_matrix(vintage_matrix, period_type='quarterly', calculation_type='percent', output_file_path=None):
    """
    Plot the vintage matrix as a line graph showing cumulative charge-offs over time.

    Args:
        vintage_matrix (pd.DataFrame): The vintage matrix DataFrame
        period_type (str): Time period for analysis - 'monthly', 'quarterly', or 'yearly'
        calculation_type (str): 'sum' for cumulative amount, 'percent' for percentage
        output_file_path (str): Path to save the plot. If None, generates default name
    """
    #plt.style.use('seaborn')

    # Remove originations row and prepare data
    if 'original_amount_financed' in vintage_matrix.index:
        plot_data = vintage_matrix.drop('original_amount_financed')
    else:
        plot_data = vintage_matrix

    # Convert index to numeric and sort
    plot_data.index = pd.to_numeric(plot_data.index, errors='coerce')
    plot_data = plot_data.sort_index().dropna(how='all')

    # Create the plot
    fig, ax = plt.subplots(figsize=(12, 8))

    # Plot each vintage with different colors
    for i, column in enumerate(plot_data.columns):
        valid_data = plot_data[column].dropna()
        if not valid_data.empty:
            ax.plot(valid_data.index, valid_data.values,
                   marker='o', markersize=4,
                   label=column,
                   linewidth=2)

    # Set title and labels
    title_period = period_type.capitalize()
    title_calc = 'Net Charge-Off Percentage' if calculation_type == 'percent' else 'Cumulative Net Charge-Off Amount'
    ax.set_title(f'{title_period} Vintage Analysis\n{title_calc}', pad=20, fontsize=12)
    ax.set_xlabel(f'Periods Since Origination ({period_type})', fontsize=10)
    y_label = 'Charge-Off (% of Original Balance)' if calculation_type == 'percent' else 'Cumulative Net Charge-Off Amount'
    ax.set_ylabel(y_label, fontsize=10)

    # Format axes
    ax.grid(True, linestyle='--', alpha=0.7)
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)

    # Set y-axis limits and format
    max_value = plot_data.max().max()
    if calculation_type == 'percent':
        y_max = min(max_value * 1.2, 10)  # Cap at 10% by default, adjust if needed
        ax.set_ylim(0, y_max)
        ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{x:.2f}%'))
    else:
        ax.set_ylim(0, max_value * 1.2)
        ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x:,.0f}'))

    # Add legend
    ax.legend(title='Origination Period',
             bbox_to_anchor=(1.05, 1),
             loc='upper left')

    # Adjust layout
    plt.tight_layout()

    # Save the plot
    if output_file_path is None:
        output_file_base_name = f'vintage_analysis_{period_type.lower()}_{calculation_type}'
    else:
        output_file_base_name = os.path.splitext(output_file_path)[0]

    output_image_path = f'{output_file_base_name}.png'
    plt.savefig(output_image_path, bbox_inches='tight', dpi=300)
    plt.close()

    print(f"Vintage analysis plot saved to {output_image_path}")

In [None]:
# Load and validate data
df = load_data(new_filename)
df = validate_data(df, calculation_type, period_type)

# Proceed with calculations and plotting
vintage_matrix = calculate_vintage_matrix(
    df,
    period_type=period_type,
    calculation_type=calculation_type,
    output_file_path=output_file_path
)

plot_vintage_matrix(
    vintage_matrix,
    calculation_type=calculation_type,
    period_type=period_type,
    output_file_path=output_file_path
)