In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# =============================================================================
# 0. CONFIGURATION
# =============================================================================
# Define the temporal split cut-off
TRAIN_CUTOFF_YEAR = 2022  # Training: <= 2022, Testing: > 2022
FILE_NAME = "data.csv"    # Replace with your actual file name

# =============================================================================
# 1. LOAD & TYPE CONVERSION
# =============================================================================
df = pd.read_csv(FILE_NAME, low_memory=False)

# 1.1 Datetime conversion
if 'datadate' in df.columns:
    df['datadate'] = pd.to_datetime(df['datadate'], errors='coerce')

# 1.2 Numeric conversion
numeric_cols = ['prcc_c', 'prcc_f', 'gvkey', 'fyear', 'ismod',
                'ib', 'at', 'dltt', 'dlc', 'che', 're', 'seq',
                'xrd', 'dv', 'ni', 'act', 'lct', 'oancf', 'ivncf', 'fincf',
                'oibdp', 'xint', 'mkvalt', 'capx']

for col in numeric_cols:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')

# Ensure key columns exist
required_keys = ['gvkey', 'fyear']
missing_keys = [c for c in required_keys if c not in df.columns]
if missing_keys:
    raise KeyError(f"Missing required column(s): {missing_keys}. Check your CSV schema.")

# 1.3 Create firm_id and Sort
df['firm_id'] = df['gvkey']  # Keep gvkey as ID
df = df.sort_values(['firm_id', 'fyear']).reset_index(drop=True)

# Prefer panel-unique duplicates removal if possible
df = df.drop_duplicates(subset=['firm_id', 'fyear'], keep='last').reset_index(drop=True)

# =============================================================================
# 2. FEATURE ENGINEERING (RAW)
# =============================================================================
# We construct raw ratios BEFORE scaling/winsorizing to preserve economic meaning.
# However, we must handle division by zero.

eps = 1e-6

# 2.1 Distress Label Construction (Target Variable: t+1)
# ------------------------------------------------------
# Definition: Distress = 1 if (Interest Coverage < 1) OR (Negative Equity)
# Note: This checks distress in the CURRENT year. We will shift it later for prediction.

# Interest Coverage: EBIT / Interest Expense
df['interest_coverage'] = np.nan
if all(c in df.columns for c in ['oibdp', 'xint']):
    df['interest_coverage'] = df['oibdp'] / (df['xint'] + eps)

# Binary Distress Indicator
cond_coverage = (df['interest_coverage'] < 1.0)
cond_insolvency = pd.Series(False, index=df.index)
if 'seq' in df.columns:
    cond_insolvency = (df['seq'] < 0)

df['distress_dummy'] = (cond_coverage.fillna(False) | cond_insolvency.fillna(False)).astype(int)

# 2.2 Financial Ratios
# -------------------------------------------
df['roa'] = np.nan
df['cf_roa'] = np.nan
if all(c in df.columns for c in ['ib', 'at']):
    df['roa'] = df['ib'] / (df['at'] + eps)
if all(c in df.columns for c in ['oancf', 'at']):
    df['cf_roa'] = df['oancf'] / (df['at'] + eps)

# Leverage
df['total_debt'] = np.nan
df['leverage'] = np.nan
df['debt_to_equity'] = np.nan
if 'dltt' in df.columns or 'dlc' in df.columns:
    dltt = df['dltt'] if 'dltt' in df.columns else 0
    dlc = df['dlc'] if 'dlc' in df.columns else 0
    df['total_debt'] = pd.to_numeric(dltt, errors='coerce').fillna(0) + pd.to_numeric(dlc, errors='coerce').fillna(0)

if 'total_debt' in df.columns and 'at' in df.columns:
    df['leverage'] = df['total_debt'] / (df['at'] + eps)
if 'total_debt' in df.columns and 'seq' in df.columns:
    df['debt_to_equity'] = df['total_debt'] / (df['seq'] + eps)

# Liquidity
df['current_ratio'] = np.nan
df['cash_ratio'] = np.nan
if all(c in df.columns for c in ['act', 'lct']):
    df['current_ratio'] = df['act'] / (df['lct'] + eps)
if all(c in df.columns for c in ['che', 'lct']):
    df['cash_ratio'] = df['che'] / (df['lct'] + eps)

# Solvency / Altman-style
df['re_to_assets'] = np.nan
df['mkt_to_book'] = np.nan
if all(c in df.columns for c in ['re', 'at']):
    df['re_to_assets'] = df['re'] / (df['at'] + eps)
if all(c in df.columns for c in ['mkvalt', 'seq']):
    df['mkt_to_book'] = df['mkvalt'] / (df['seq'] + eps)

# Investment
df['capex_ratio'] = np.nan
if all(c in df.columns for c in ['capx', 'at']):
    df['capex_ratio'] = df['capx'] / (df['at'] + eps)

# 2.3 Lagging (Predicting t+1 using t)
# ------------------------------------
df['target_next_year_distress'] = df.groupby('firm_id')['distress_dummy'].shift(-1)

# Drop rows where target is NaN (usually the last year for each firm)
df = df.dropna(subset=['target_next_year_distress']).reset_index(drop=True)

In [None]:
df.describe()

In [None]:
# =============================================================================
# 3. SPLIT TRAIN AND TEST (Temporal)
# =============================================================================
print(f"Splitting data at year {TRAIN_CUTOFF_YEAR}...")

train_mask = df['fyear'] <= TRAIN_CUTOFF_YEAR
test_mask = df['fyear'] > TRAIN_CUTOFF_YEAR

train = df[train_mask].copy()
test = df[test_mask].copy()

print(f"Train samples: {len(train)}")
print(f"Test samples:  {len(test)}")

In [None]:

# =============================================================================
# 4. WINSORIZATION (Fit on Train, Apply to Test)
# =============================================================================
winsor_cols = ['roa', 'cf_roa', 'leverage', 'debt_to_equity', 'current_ratio',
               'cash_ratio', 're_to_assets', 'mkt_to_book', 'capex_ratio',
               'interest_coverage']

winsor_cols = [c for c in winsor_cols if c in train.columns and c in test.columns]

winsor_stats = {}

for col in winsor_cols:
    train[col] = pd.to_numeric(train[col], errors='coerce')
    test[col] = pd.to_numeric(test[col], errors='coerce')

    # Fit bounds on Train (guard against all-NaN columns)
    if train[col].notna().sum() == 0:
        winsor_stats[col] = (np.nan, np.nan)
        continue

    lower = train[col].quantile(0.01)
    upper = train[col].quantile(0.99)
    winsor_stats[col] = (lower, upper)

    train[col] = train[col].clip(lower=lower, upper=upper)
    test[col] = test[col].clip(lower=lower, upper=upper)

print("Winsorization complete (Bounds derived from Train set).")

In [None]:

# =============================================================================
# 5. STANDARDIZATION (Fit on Train, Apply to Test)
# =============================================================================
# Z-score = (x - mean_train) / std_train

# Log-transform size variables if present
for df_split in [train, test]:
    for col in ['at', 'mkvalt']:
        if col in df_split.columns:
            df_split[col] = pd.to_numeric(df_split[col], errors='coerce')
            df_split[f'log_{col}'] = np.log1p(df_split[col].clip(lower=0))

final_features = winsor_cols + [c for c in ['log_at', 'log_mkvalt'] if c in train.columns and c in test.columns]

scaling_stats = {}

for col in final_features:
    train[col] = pd.to_numeric(train[col], errors='coerce')
    test[col] = pd.to_numeric(test[col], errors='coerce')

    mu = train[col].mean()
    sigma = train[col].std()
    scaling_stats[col] = (mu, sigma)

    train[f'z_{col}'] = (train[col] - mu) / (sigma + 1e-8)
    test[f'z_{col}'] = (test[col] - mu) / (sigma + 1e-8)

print("Standardization complete (Stats derived from Train set).")


In [None]:
############################################################
# 3. Feature Selection Prep (Chapter 2: Feature Selection)
# ----------------------------------------------------------
# Filter methods:
#   - Correlation analysis for numeric features vs target
#   - VIF for multicollinearity diagnostics
############################################################

# Use features that were defined and standardized
feature_vars = [
    'roa', 'cf_roa', 'leverage', 'debt_to_equity', 'current_ratio',
    'cash_ratio', 're_to_assets', 'mkt_to_book', 'capex_ratio',
    'interest_coverage', 'log_at', 'log_mkvalt'
]

# Retain only those that exist in train
feature_vars = [v for v in feature_vars if v in train.columns]

# Use train data for feature selection to avoid look-ahead bias
target_col = 'target_next_year_distress'

# 3.1 Correlation with target variable
# ------------------------------------
print("----- Correlation of Features with Target Variable -----")
corr_matrix = train[[target_col] + feature_vars].corr()

for var in feature_vars:
    r = corr_matrix.loc[target_col, var]
    print(f"{var:<20} r = {r: .4f}")

# 3.2 VIF (Variance Inflation Factor)
# ------------------------------------
vif_vars = feature_vars
X_df = train[vif_vars].dropna()
X = X_df.values
var_names = list(vif_vars)

print("\n----- VIF for selected features -----")

def compute_vif(X, j):
    y = X[:, j]
    X_other = np.delete(X, j, axis=1)
    X_other_const = np.column_stack([np.ones(X_other.shape[0]), X_other])
    beta, _, _, _ = np.linalg.lstsq(X_other_const, y, rcond=None)
    y_pred = X_other_const @ beta
    ss_res = np.sum((y - y_pred) ** 2)
    ss_tot = np.sum((y - y.mean()) ** 2)
    r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0.0
    return np.inf if r2 >= 1 else 1.0 / (1.0 - r2)

for j, name in enumerate(var_names):
    vif_val = compute_vif(X, j)
    print(f"{name:<20} VIF = {vif_val: .4f}")

# 3.3 Variability checks (IQR & Std. Dev.)
# ----------------------------------------
print("\n----- Variability Checks (IQR & Std. Dev.) -----")
for var in feature_vars:
    series = train[var].dropna()
    if series.empty:
        print(f"{var:<20} IQR: NA, Std. Dev.: NA")
        continue
    q25 = series.quantile(0.25)
    q75 = series.quantile(0.75)
    iqr = q75 - q25
    std = series.std(ddof=1)
    print(f"{var:<20} IQR: {iqr:.6f}, Std. Dev.: {std:.6f}")





Generating boxplots for winsorized features...
