In [None]:
# Splitting up the code

In [None]:
# just equity and fixed income regressions below from grok

In [8]:
import pandas as pd
import numpy as np
import os
import random
import logging
from datetime import timedelta, datetime
from dateutil.relativedelta import relativedelta
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from sqlalchemy import create_engine, text
from tqdm import tqdm
import statsmodels.api as sm
import time
import sys
from contextlib import contextmanager
from functools import wraps
from collections import defaultdict

# Configuration
CONFIG = {
    "database": {
        "server": "JULIANS_LAPTOP\\SQLEXPRESS",
        "database": "CWA_Fund_Database",
        "driver": "ODBC Driver 18 for SQL Server"
    },
    "return_metric": "1 Month Return",
    "rolling_periods": [12, 24, 36, 48, 60],
    "dry_run": True,
    "sample_dry_run": True,
    "sample_size": 10,
    "chunk_size": 5000,
    "batch_insert_size": 5000,
    "max_workers_cpu": 16,
    "batch_size": 100,
    "log_level": "warning"  # Options: "debug", "info", "warning"
}

CONNECTION_STRING = (
    f"mssql+pyodbc://{CONFIG['database']['server']}/{CONFIG['database']['database']}"
    f"?driver={CONFIG['database']['driver']}&trusted_connection=yes&TrustServerCertificate=yes"
)
engine = create_engine(CONNECTION_STRING, pool_size=10, max_overflow=5)

RETURN_METRIC = CONFIG["return_metric"]
ROLLING_PERIODS = CONFIG["rolling_periods"]
DRY_RUN = CONFIG["dry_run"]
SAMPLE_DRY_RUN = CONFIG["sample_dry_run"]
SAMPLE_SIZE = CONFIG["sample_size"]
CHUNK_SIZE = CONFIG["chunk_size"]
BATCH_INSERT_SIZE = CONFIG["batch_insert_size"]
MAX_WORKERS_CPU = CONFIG["max_workers_cpu"]
BATCH_SIZE = CONFIG["batch_size"]

# Logging setup
SUMMARY_LOG = "factor_attribution_summary.log"
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("factor_attribution.log"),
        logging.StreamHandler(sys.stdout)
    ],
    force=True
)
logger = logging.getLogger()

def log_summary(message):
    with open(SUMMARY_LOG, 'a') as f:
        f.write(f"{datetime.now()}: {message}\n")

# Mappings
CATEGORY_TO_REGION = {
    "US Equity Large Cap Blend": ("USA", "US Equity Large Cap Blend"),
    "US Equity Large Cap Growth": ("USA", "US Equity Large Cap Growth"),
    "US Equity Large Cap Value": ("USA", "US Equity Large Cap Value"),
    "US Equity Mid Cap": ("USA", "US Equity Mid Cap"),
    "US Equity Small Cap": ("USA", "US Equity Small Cap"),
    "Global Equity Large Cap": ("Global", "Global Equity Large Cap"),
    "Global Equity Mid/Small Cap": ("Global", "Global Equity Mid/Small Cap"),
    "Global Emerging Markets Equity": ("Global", "Global Emerging Markets Equity"),
    "Europe Equity Large Cap": ("Intl", "Europe Equity Large Cap"),
    "Asia Equity": ("Intl", "Asia Equity"),
    "Japan Equity": ("Intl", "Japan Equity"),
    "Emerging Markets Fixed Income": ("Intl", "Emerging Markets Fixed Income"),
    "US Fixed Income": ("USA", "US Fixed Income"),
    "US Municipal Fixed Income": ("USA", "US Municipal Fixed Income"),
    "Global Fixed Income": ("Global", "Global Fixed Income"),
    "Flexible Allocation": ("Global", "Flexible Allocation"),
    "Aggressive Allocation": ("Global", "Aggressive Allocation"),
    "Moderate Allocation": ("Global", "Moderate Allocation"),
    "Cautious Allocation": ("Global", "Cautious Allocation"),
    "Options Trading": ("USA", "Options Trading"),
    "Multialternative": ("Global", "Multialternative"),
    "Market Neutral": ("Global", "Market Neutral"),
    "Long/Short Equity": ("Global", "Long/Short Equity"),
    "Alternative Miscellaneous": ("Global", "Alternative Miscellaneous"),
    "Energy Sector Equity": ("USA", "Energy Sector Equity"),
    "Equity Miscellaneous": ("USA", "Equity Miscellaneous"),
    "Financials Sector Equity": ("USA", "Financials Sector Equity"),
    "Healthcare Sector Equity": ("USA", "Healthcare Sector Equity"),
    "Consumer Goods & Services Sector Equity": ("USA", "Consumer Goods & Services Sector Equity"),
    "Communications Sector Equity": ("USA", "Communications Sector Equity"),
    "Industrials Sector Equity": ("USA", "Industrials Sector Equity"),
    "Other Sector Equity": ("USA", "Other Sector Equity"),
    "Real Estate Sector Equity": ("USA", "Real Estate Sector Equity"),
    "Precious Metals Sector Equity": ("USA", "Precious Metals Sector Equity"),
    "Technology Sector Equity": ("USA", "Technology Sector Equity"),
    "Utilities Sector Equity": ("USA", "Utilities Sector Equity"),
    "Natural Resources Sector Equity": ("USA", "Natural Resources Sector Equity"),
    "Infrastructure Sector Equity": ("USA", "Infrastructure Sector Equity"),
    "Trading Tools": ("USA", "Trading Tools"),
    "Asia ex-Japan Equity": ("Intl", "Asia ex-Japan Equity"),
    "Australia & New Zealand Equity": ("Intl", "Australia & New Zealand Equity"),
    "Canadian Equity Large Cap": ("Intl", "Canadian Equity Large Cap"),
    "Europe Equity Mid/Small Cap": ("Intl", "Europe Equity Mid/Small Cap"),
    "Greater China Equity": ("Intl", "Greater China Equity"),
    "India Equity": ("Intl", "India Equity"),
    "Mexico Equity": ("Intl", "Mexico Equity"),
    "Korea Equity": ("Intl", "Korea Equity"),
    "Latin America Equity": ("Intl", "Latin America Equity"),
    "UK Equity Large Cap": ("Intl", "UK Equity Large Cap"),
    "Thailand Equity": ("Intl", "Thailand Equity"),
    "Convertibles": ("USA", "Convertibles"),
    "Fixed Income Miscellaneous": ("USA", "Fixed Income Miscellaneous"),
    "Allocation Miscellaneous": ("Global", "Allocation Miscellaneous")
}

CATEGORY_TO_REGRESSIONS = {
    "Energy Sector Equity": "Equity_USA",
    "Equity Miscellaneous": "Equity_USA",
    "Financials Sector Equity": "Equity_USA",
    "Healthcare Sector Equity": "Equity_USA",
    "Consumer Goods & Services Sector Equity": "Equity_USA",
    "Communications Sector Equity": "Equity_USA",
    "Industrials Sector Equity": "Equity_USA",
    "Other Sector Equity": "Equity_USA",
    "Real Estate Sector Equity": "Equity_USA",
    "Precious Metals Sector Equity": "Equity_USA",
    "Technology Sector Equity": "Equity_USA",
    "Utilities Sector Equity": "Equity_USA",
    "US Equity Large Cap Blend": "Equity_USA",
    "US Equity Large Cap Growth": "Equity_USA",
    "US Equity Large Cap Value": "Equity_USA",
    "US Equity Mid Cap": "Equity_USA",
    "US Equity Small Cap": "Equity_USA",
    "Options Trading": "Equity_USA",
    "Natural Resources Sector Equity": "Equity_USA",
    "Infrastructure Sector Equity": "Equity_USA",
    "Asia ex-Japan Equity": "Equity_Intl",
    "Australia & New Zealand Equity": "Equity_Intl",
    "Canadian Equity Large Cap": "Equity_Intl",
    "Europe Equity Large Cap": "Equity_Intl",
    "Europe Equity Mid/Small Cap": "Equity_Intl",
    "Greater China Equity": "Equity_Intl",
    "India Equity": "Equity_Intl",
    "Mexico Equity": "Equity_Intl",
    "Japan Equity": "Equity_Intl",
    "Korea Equity": "Equity_Intl",
    "Latin America Equity": "Equity_Intl",
    "UK Equity Large Cap": "Equity_Intl",
    "Thailand Equity": "Equity_Intl",
    "Global Emerging Markets Equity": "Equity_Global",
    "Global Equity Large Cap": "Equity_Global",
    "Global Equity Mid/Small Cap": "Equity_Global",
    "Global Fixed Income": "Fixed_Income",
    "Convertibles": "Fixed_Income",
    "Emerging Markets Fixed Income": "Fixed_Income",
    "Fixed Income Miscellaneous": "Fixed_Income",
    "US Fixed Income": "Fixed_Income",
    "US Municipal Fixed Income": "Fixed_Income",
    "Aggressive Allocation": "Allocation",
    "Allocation Miscellaneous": "Allocation",
    "Cautious Allocation": "Allocation",
    "Flexible Allocation": "Allocation",
    "Moderate Allocation": "Allocation",
    "Alternative Miscellaneous": "Alternative",
    "Long/Short Equity": "Alternative",
    "Market Neutral": "Alternative",
    "Multialternative": "Alternative"
}

REGRESSION_SETS = {
    "Equity_USA": [
        ("Equity_USA_1", ['MKT', 'HML_Devil', 'QMJ', 'SMB', 'UMD', 'BAB'], "USA"),
        ("Equity_USA_5", ['MKT', 'BAB', 'TSM-FI', 'TSM-FX'], "USA"),
        ("Equity_USA_6", ['MKT', 'SMB', 'BAB'], "USA"),
        ("Equity_USA_7", ['MKT', 'HML_Devil', 'QMJ', 'UMD', 'SMB', 'BAB', 'TSM-FI', 'TSM-FX'], "USA")
    ],
    "Equity_Intl": [
        ("Equity_Intl_1", ['MKT', 'HML_Devil', 'QMJ', 'SMB', 'UMD', 'TSM-EQ', 'BAB'], "Intl")
    ],
    "Equity_Global": [
        ("Equity_Global_1", ['MKT', 'HML_Devil', 'QMJ', 'SMB', 'UMD', 'TSM-EQ', 'BAB'], "Global")
    ],
    "Fixed_Income": [
        ("FI_1", ['TERM_Int', 'TERM_Long', 'CREDIT', 'CREDIT_HY', 'TSM-FI', 'TSM-FX'], "Global")
    ],
    "Allocation": [
        ("Allocation_1", ['MKT', 'HML_Devil', 'QMJ', 'SMB', 'UMD', 'BAB', 'TSM-EQ', 'TSM-FI'], "Global")
    ],
    "Alternative": [
        ("Alternative_1", ['MKT', 'HML_Devil', 'QMJ', 'SMB', 'UMD', 'BAB', 'TSM-EQ', 'TSM-FI', 'TSM-FX'], "Global")
    ]
}

# Global factor cache
FACTOR_CACHE = {}

# Helper Functions
def category_to_region(category):
    return CATEGORY_TO_REGION.get(category, ("USA", "Unknown"))

@contextmanager
def database_transaction():
    with engine.connect() as connection:
        transaction = connection.begin()
        try:
            yield connection
            transaction.commit()
        except Exception as e:
            transaction.rollback()
            logger.error(f"Transaction failed: {e}")
            raise

def timer(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        log_level = kwargs.get('log_level', CONFIG["log_level"])
        if log_level == "debug":
            logger.debug(f"Starting {func.__name__}")
        result = func(*args, **kwargs)
        if log_level in ["debug", "info"]:
            logger.info(f"{func.__name__} took {time.time() - start_time:.2f} seconds")
        return result
    return wrapper

def standardize_dates(df, freq='ME'):
    """Standardize DataFrame index to month-end dates."""
    df.index = pd.to_datetime(df.index) + pd.offsets.MonthEnd(0)
    df.index = df.index.drop_duplicates()
    return df.asfreq(freq)

def align_regression_dates(returns, factor_data, start_date, end_date, expected_months, log_level="warning"):
    """Align fund returns and factor data for a regression window."""
    window_dates = returns.index.intersection(factor_data.index)
    window_dates = window_dates[(window_dates >= start_date) & (window_dates <= end_date)]
    
    if len(window_dates) < expected_months:
        if log_level == "debug":
            logger.debug(f"Insufficient data for window {start_date} to {end_date}: {len(window_dates)}/{expected_months} months")
        return None, None, f"Insufficient data: {len(window_dates)} months"
    
    window_returns = returns.reindex(window_dates)
    window_factors = factor_data.reindex(window_dates)
    
    if window_returns.isna().any() or window_factors.isna().any().any():
        if log_level == "debug":
            logger.debug(f"Missing data in window {start_date} to {end_date}")
        return None, None, "Missing data in window"
    
    if log_level == "debug":
        logger.debug(f"Aligned window {start_date} to {end_date}: {len(window_dates)} months")
    
    return window_returns, window_factors, None

# Data Loading
@timer
def load_factors(engine, factors, region, log_level="warning"):
    """Load equity factors from factor_returns table."""
    REGION_MAP = {"International": "Intl"}
    region = REGION_MAP.get(region, region)
    
    cache_key = (tuple(factors), region)
    if cache_key in FACTOR_CACHE:
        if log_level == "debug":
            logger.debug(f"Using cached factors for {cache_key}")
        return FACTOR_CACHE[cache_key]
    
    factor_str = ",".join([f"'{f}'" for f in factors])
    query = text("""
        SELECT factor, date, value
        FROM factor_returns
        WHERE factor IN ({}) AND UPPER(region) = UPPER(:region)
    """.format(factor_str))
    
    with database_transaction() as conn:
        with ThreadPoolExecutor(max_workers=MAX_WORKERS_CPU) as executor:
            future = executor.submit(pd.read_sql_query, query, conn, params={"region": region}, parse_dates=["date"])
            df = future.result()
    
    if df.empty:
        if log_level in ["debug", "info"]:
            logger.warning(f"No data loaded from factor_returns for factors={factors}, region={region}")
        if region != "USA":
            if log_level == "debug":
                logger.info(f"Falling back to USA for factors={factors}")
            query = text("""
                SELECT factor, date, value
                FROM factor_returns
                WHERE factor IN ({}) AND region = 'USA'
            """.format(factor_str))
            with ThreadPoolExecutor(max_workers=MAX_WORKERS_CPU) as executor:
                future = executor.submit(pd.read_sql_query, query, conn, params={})
                df = future.result()
    
    if df.empty:
        logger.error(f"No data loaded for factors={factors}, region={region} or USA")
        return pd.DataFrame()
    
    if log_level == "debug":
        logger.debug(f"Loaded {len(df)} rows for factors={factors}, region={region}")
    
    df = df.pivot_table(index="date", columns="factor", values="value")
    df = standardize_dates(df)
    df = df[df.index >= '2015-01-01']  # Start date for factor data
    
    if not df.apply(lambda x: pd.api.types.is_numeric_dtype(x)).all():
        logger.error(f"Non-numeric data in factors: {df.columns[~df.apply(lambda x: pd.api.types.is_numeric_dtype(x))].tolist()}")
        return pd.DataFrame()
    
    if log_level == "debug":
        logger.debug(f"Factor data range: {df.index.min()} to {df.index.max()}, {len(df)} months")
    
    FACTOR_CACHE[cache_key] = df
    return df

@timer
def load_fixed_income_factors(factor_list, log_level="warning"):
    """Load fixed income factors from Fixed_Income_Factor_Returns table."""
    factor_str = ",".join([f"'{f}'" for f in factor_list])
    query = text("""
        SELECT Date, Factor_Name, ReturnValue
        FROM Fixed_Income_Factor_Returns
        WHERE Factor_Name IN ({})
    """.format(factor_str))
    
    with database_transaction() as conn:
        with ThreadPoolExecutor(max_workers=MAX_WORKERS_CPU) as executor:
            future = executor.submit(pd.read_sql_query, query, conn, parse_dates=["Date"])
            df = future.result()
    
    if df.empty:
        logger.warning(f"No fixed income factors for {factor_list}")
        return pd.DataFrame()
    
    df = df.pivot(index="Date", columns="Factor_Name", values="ReturnValue")
    df = standardize_dates(df)
    df = df[df.index >= '2015-01-01']  # Start date for factor data
    
    if not df.apply(lambda x: pd.api.types.is_numeric_dtype(x)).all():
        logger.error(f"Non-numeric data in fixed income factors: {df.columns[~df.apply(lambda x: pd.api.types.is_numeric_dtype(x))].tolist()}")
        return pd.DataFrame()
    
    if log_level == "debug":
        logger.debug(f"Fixed income factor data range: {df.index.min()} to {df.index.max()}, {len(df)} months")
    
    return df

@timer
def load_fund_metadata(log_level="warning"):
    """Load fund metadata from database."""
    query = text("""
        SELECT 
            f.SymbolCUSIP, 
            f.Region, 
            f.YC_Global_Category_ID, 
            c.Global_Category_Name,
            f.YC_Category_ID,
            y.Category_Name,
            f.CWA_Broad_Category_ID,
            b.CWA_Broad_Category_Name
        FROM Funds_to_Screen f
        JOIN YC_Global_Category_List c ON f.YC_Global_Category_ID = c.ID
        JOIN YC_Category_List y ON f.YC_Category_ID = y.ID
        LEFT JOIN CWA_Broad_Category_List b ON f.CWA_Broad_Category_ID = b.ID
    """)
    
    with database_transaction() as conn:
        with ThreadPoolExecutor(max_workers=MAX_WORKERS_CPU) as executor:
            future = executor.submit(pd.read_sql_query, query, conn)
            df = future.result()
    
    if log_level in ["debug", "info"]:
        logger.info(f"Loaded metadata for {len(df)} funds")
    
    df[["Region", "FactorProfile"]] = df["Global_Category_Name"].map(category_to_region).apply(pd.Series)
    if df["CWA_Broad_Category_Name"].isnull().all():
        logger.warning("CWA_Broad_Category_Name missing; some regressions may be skipped")
    return df.dropna(subset=["Region", "FactorProfile"])

@timer
def load_fund_returns(fund_ids=None, log_level="warning"):
    """Load fund returns from database."""
    query = text("""
        SELECT SymbolCUSIP, Date, ReturnValue
        FROM Fund_Returns_Timeseries
        WHERE Metric = :metric AND ReturnValue IS NOT NULL AND Date IS NOT NULL
    """)
    params = {"metric": RETURN_METRIC}
    
    if fund_ids:
        fund_ids = [str(fid) for fid in fund_ids if isinstance(fid, str) and fid.strip()]
        if not fund_ids:
            logger.warning("No valid SymbolCUSIP provided")
            return pd.DataFrame()
        query = text(str(query) + f" AND SymbolCUSIP IN ({','.join([f':fid{i}' for i in range(len(fund_ids))])})")
        params.update({f"fid{i}": fid for i, fid in enumerate(fund_ids)})
    
    chunks = []
    with database_transaction() as conn:
        with ThreadPoolExecutor(max_workers=MAX_WORKERS_CPU) as executor:
            for chunk in executor.submit(pd.read_sql_query, query, conn, params=params, parse_dates=["Date"], chunksize=CHUNK_SIZE).result():
                if log_level == "debug":
                    logger.debug(f"Loaded chunk of {len(chunk)} rows")
                chunks.append(chunk)
    
    df = pd.concat(chunks) if chunks else pd.DataFrame()
    if df.empty:
        logger.warning(f"No returns data loaded for SymbolCUSIP: {fund_ids}")
        return pd.DataFrame()
    
    if log_level in ["debug", "info"]:
        logger.info(f"Loaded returns for {len(df['SymbolCUSIP'].unique())} funds")
    
    df = df.pivot(index="Date", columns="SymbolCUSIP", values="ReturnValue")
    df = standardize_dates(df)
    
    for col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')
        total_months = len(df)
        missing_months = df[col].isna().sum()
        if log_level in ["debug", "info"]:
            logger.info(f"{col}: {missing_months} of {total_months} months missing")
    
    if log_level == "debug":
        logger.debug(f"Fund returns range: {df.index.min()} to {df.index.max()}, {len(df)} months")
    
    return df

# Regression Processing
@timer
def run_rolling_regression(symbol, returns, factor_data_dict, regression_name, window_months, log_level="warning"):
    """Run rolling OLS regressions stepping 1 month."""
    records = []
    skip_reasons = defaultdict(int)
    
    returns = returns.sort_index().dropna()
    if returns.std() < 1e-6:
        skip_reasons["low_variance"] += 1
        if log_level == "debug":
            logger.debug(f"Skipping {symbol} ({regression_name}, window={window_months}m): low variance")
        return records, skip_reasons
    
    # Get available dates for each factor dataset
    eq_data = factor_data_dict.get("equity")
    fi_data = factor_data_dict.get("fixed_income")
    
    available_dates = returns.index
    if eq_data is not None:
        available_dates = available_dates.intersection(eq_data.index)
    if fi_data is not None:
        available_dates = available_dates.intersection(fi_data.index)
    
    if len(available_dates) < window_months:
        skip_reasons["insufficient_data"] += 1
        if log_level == "debug":
            logger.debug(f"Skipping {symbol} ({regression_name}, window={window_months}m): {len(available_dates)} months available")
        return records, skip_reasons
    
    for end_date in available_dates[window_months-1::1]:  # Step 1 month
        start_date = end_date - pd.offsets.MonthEnd(window_months)
        if start_date < available_dates[0]:
            continue
        
        # Align returns and factor data for the window
        window_returns, _, error = align_regression_dates(
            returns, returns, start_date, end_date, window_months, log_level
        )
        if error:
            skip_reasons[error] += 1
            continue
        
        factor_parts = []
        if eq_data is not None:
            window_eq, eq_factors, error = align_regression_dates(
                returns, eq_data, start_date, end_date, window_months, log_level
            )
            if error:
                skip_reasons[error] += 1
                continue
            factor_parts.append(eq_factors)
        
        if fi_data is not None:
            window_fi, fi_factors, error = align_regression_dates(
                returns, fi_data, start_date, end_date, window_months, log_level
            )
            if error:
                skip_reasons[error] += 1
                continue
            factor_parts.append(fi_factors)
        
        if not factor_parts:
            skip_reasons["no_factor_data"] += 1
            if log_level == "debug":
                logger.debug(f"No factor data for {symbol} ({regression_name}, window={window_months}m)")
            continue
        
        try:
            window_factors = pd.concat(factor_parts, axis=1)
            if window_factors.empty or window_factors.shape[1] == 0:
                skip_reasons["empty_factor_data"] += 1
                if log_level == "debug":
                    logger.debug(f"Empty factor data after concat for {symbol} ({regression_name}, window={window_months}m)")
                continue
            
            if log_level == "debug":
                logger.debug(f"Window factors shape for {symbol} ({regression_name}, window={window_months}m): {window_factors.shape}")
            
            X = sm.add_constant(window_factors)
            model = sm.OLS(window_returns, X, missing='drop').fit()
            dropped_pct = (window_months - model.nobs) / window_months
            if dropped_pct > 0.2 and log_level == "debug":
                logger.debug(f"High data loss in regression for {symbol} ({regression_name}, end_date={end_date}): {dropped_pct:.2%}")
            
            coefficients = model.params.to_dict()
            tvalues = model.tvalues.to_dict()
            pvalues = model.pvalues.to_dict()
            r_squared = model.rsquared
            
            record = {
                "SymbolCUSIP": symbol,
                "RegressionName": regression_name,
                "Window": f"{window_months}m",
                "EndDate": end_date,
                "R_Squared": r_squared
            }
            for factor in coefficients:
                record[f"{factor}_beta"] = coefficients[factor]
                record[f"{factor}_tvalue"] = tvalues.get(factor, None)
                record[f"{factor}_pvalue"] = pvalues.get(factor, None)
            records.append(record)
        except Exception as e:
            skip_reasons["regression_error"] += 1
            if log_level in ["debug", "info"]:
                logger.warning(f"Regression failed for {symbol} ({regression_name}, window={window_months}m, end_date={end_date}): {str(e)}")
    
    if not records and log_level in ["debug", "info"]:
        logger.info(f"No regressions completed for {symbol} ({regression_name}, window={window_months}m): {dict(skip_reasons)}")
    
    return records, skip_reasons

# Fund Processing
logger = logging.getLogger(__name__)

@timer
def load_factors_for_regression(fund_data, factors, region):
    try:
        # Load factor data
        factor_data = load_factors(factors, region)
        fund_returns = fund_data['returns']
        
        # Check lengths and date ranges
        fund_len = len(fund_returns)
        factor_len = len(factor_data)
        fund_start, fund_end = fund_returns.index.min(), fund_returns.index.max()
        factor_start, factor_end = factor_data.index.min(), factor_data.index.max()
        
        if fund_len != factor_len:
            logger.error(
                f"Length mismatch for fund {fund_data['ticker']}: "
                f"Fund returns ({fund_len}, {fund_start} to {fund_end}) "
                f"vs Factor data ({factor_len}, {factor_start} to {factor_end})"
            )
            raise ValueError("Length mismatch between fund returns and factor data")
        
        # Align data on common dates
        aligned_fund, aligned_factors = fund_returns.align(factor_data, join='inner')
        if len(aligned_fund) == 0:
            logger.error(f"No common dates for fund {fund_data['ticker']} between fund and factor data")
            raise ValueError("No overlapping dates after alignment")
        
        logger.info(
            f"Aligned data for {fund_data['ticker']}: "
            f"{len(aligned_fund)} rows from {aligned_fund.index.min()} to {aligned_fund.index.max()}"
        )
        return aligned_fund, aligned_factors
    
    except Exception as e:
        logger.error(f"Error in load_factors_for_regression for {fund_data['ticker']}: {str(e)}")
        raise

@timer
def run_regressions(symbol, returns, regression_category, regression_sets, log_level="warning"):
    """Run regressions for a fund."""
    records = []
    skip_reasons = defaultdict(int)
    
    for reg_name, factors, region in regression_sets[regression_category]:
        factor_data = load_factors_for_regression(factors, region, reg_name, symbol, log_level=log_level)
        if factor_data is None:
            skip_reasons[f"no_factor_data_{reg_name}"] += 1
            continue
        
        for window in ROLLING_PERIODS:
            reg_records, reg_skip_reasons = run_rolling_regression(
                symbol, returns[symbol], factor_data, reg_name, window, log_level=log_level
            )
            records.extend(reg_records)
            for reason, count in reg_skip_reasons.items():
                skip_reasons[f"{reason}_{reg_name}_{window}m"] += count
    
    return records, skip_reasons

@timer
def process_fund(fund_data, log_level="warning"):
    """Process a single fund."""
    symbol = fund_data["SymbolCUSIP"]
    category = fund_data["Global_Category_Name"]
    returns = pd.Series(fund_data["returns"]).dropna()
    
    if returns.empty:
        if log_level in ["debug", "info"]:
            logger.warning(f"No valid returns for {symbol}")
        return [], {"no_returns": 1}
    
    regression_category = CATEGORY_TO_REGRESSIONS.get(category, "Allocation")
    if regression_category == "Allocation" and category not in CATEGORY_TO_REGRESSIONS:
        if log_level in ["debug", "info"]:
            logger.warning(f"Unmapped category {category} for {symbol}; using Allocation")
    
    records, skip_reasons = run_regressions(
        symbol, fund_data["returns_df"], regression_category, REGRESSION_SETS, log_level=log_level
    )
    if log_level in ["debug", "info"]:
        logger.info(f"Generated {len(records)} regression records for {symbol}, skips: {dict(skip_reasons)}")
    return records, skip_reasons

@timer
def process_region(region, fund_data_list, log_level="warning"):
    """Process funds in a region."""
    records = []
    errors = 0
    skip_reasons = defaultdict(int)
    
    # Enable debug logging for Global region to diagnose length mismatch
    region_log_level = "debug" if region == "Global" else log_level
    
    if log_level in ["debug", "info"]:
        logger.info(f"Processing {len(fund_data_list)} funds in {region}")
    
    if SAMPLE_DRY_RUN:
        for fund_data in tqdm(fund_data_list, total=len(fund_data_list), desc=f"Processing {region}", file=sys.stdout):
            try:
                fund_records, fund_skips = process_fund(fund_data, log_level=region_log_level)
                records.extend(fund_records)
                for reason, count in fund_skips.items():
                    skip_reasons[reason] += count
            except Exception as e:
                logger.error(f"Error processing {fund_data['SymbolCUSIP']}: {str(e)}")
                errors += 1
    else:
        for i in range(0, len(fund_data_list), BATCH_SIZE):
            batch = fund_data_list[i:i + BATCH_SIZE]
            with ProcessPoolExecutor(max_workers=MAX_WORKERS_CPU) as executor:
                future_to_fund = {
                    executor.submit(process_fund, fund_data, log_level=region_log_level): fund_data["SymbolCUSIP"]
                    for fund_data in batch
                }
                for future in tqdm(future_to_fund, total=len(batch), desc=f"Processing {region} batch", file=sys.stdout):
                    try:
                        fund_records, fund_skips = future.result()
                        records.extend(fund_records)
                        for reason, count in fund_skips.items():
                            skip_reasons[reason] += count
                    except Exception as e:
                        logger.error(f"Error processing {future_to_fund[future]}: {str(e)}")
                        errors += 1
    
    logger.info(f"Region {region} generated {len(records)} records with {errors} errors, skips: {dict(skip_reasons)}")
    log_summary(f"Region {region}: {len(fund_data_list)} funds, {len(records)} records, {errors} errors")
    
    if not DRY_RUN and records:
        insert_batch(records, log_level=log_level)
    
    return records, errors, skip_reasons

# Database Output
@timer
def insert_batch(records, log_level="warning"):
    """Insert regression records to database."""
    if DRY_RUN:
        if log_level in ["debug", "info"]:
            logger.info(f"Dry run: Would insert {len(records)} records")
        return
    
    try:
        df = pd.DataFrame(records)
        batch_size = BATCH_INSERT_SIZE
        for i in range(0, len(df), batch_size):
            batch_df = df.iloc[i:i + batch_size]
            with database_transaction() as connection:
                batch_df.to_sql("AQRR_Factor_Attribution", connection, if_exists="append", index=False, method="multi")
            if log_level in ["debug", "info"]:
                logger.info(f"Inserted {len(batch_df)} records to database")
    except Exception as e:
        logger.error(f"Error inserting batch: {e}")
        raise

# Main Pipeline
@timer
def main(log_level="warning"):
    """Run the factor attribution pipeline."""
    logger.info("Starting main pipeline")
    log_summary("Pipeline started")
    
    try:
        fund_meta = load_fund_metadata(log_level=log_level)
    except Exception as e:
        logger.error(f"Failed to load metadata: {e}")
        log_summary(f"Error: Failed to load metadata: {e}")
        return {"error": str(e)}
    
    regions = sorted(set(fund_meta["Region"]) - {'Unknown'})
    if log_level in ["debug", "info"]:
        logger.info(f"Total funds: {len(fund_meta)}, Regions: {regions}")
    log_summary(f"Total funds: {len(fund_meta)}, Regions: {regions}")
    
    fund_ids = fund_meta["SymbolCUSIP"].tolist()
    if SAMPLE_DRY_RUN:
        fund_ids = random.sample(fund_ids, min(SAMPLE_SIZE, len(fund_ids)))
        if log_level in ["debug", "info"]:
            logger.info(f"Sampled {len(fund_ids)} funds")
        log_summary(f"Sampled {len(fund_ids)} funds")
    
    summary = {"total_funds": len(fund_ids), "regions": {}, "errors": 0, "skip_reasons": defaultdict(int)}
    for region in regions:
        region_fund_ids = fund_meta[fund_meta["Region"] == region]["SymbolCUSIP"].tolist()
        if not region_fund_ids:
            logger.warning(f"No SymbolCUSIP found for region {region}")
            summary["regions"][region] = {"funds_processed": 0, "records": 0, "errors": 0}
            continue
        if SAMPLE_DRY_RUN:
            region_fund_ids = random.sample(region_fund_ids, min(SAMPLE_SIZE, len(region_fund_ids)))
            if log_level in ["debug", "info"]:
                logger.info(f"Sampled {len(region_fund_ids)} SymbolCUSIP for {region}")
        
        try:
            returns = load_fund_returns(region_fund_ids, log_level=log_level)
        except Exception as e:
            logger.error(f"Failed to load returns for {region}: {e}")
            log_summary(f"Error: Failed to load returns for {region}: {e}")
            summary["regions"][region] = {"funds_processed": 0, "records": 0, "errors": 1}
            summary["errors"] += 1
            continue
        
        region_funds = [
            {
                "SymbolCUSIP": row["SymbolCUSIP"],
                "Global_Category_Name": row["Global_Category_Name"],
                "CWA_Broad_Category_Name": row.get("CWA_Broad_Category_Name", None),
                "returns": returns[row["SymbolCUSIP"]].to_dict() if row["SymbolCUSIP"] in returns.columns else {},
                "returns_df": returns
            }
            for _, row in fund_meta.iterrows() if row["SymbolCUSIP"] in returns.columns
        ]
        if not region_funds:
            logger.warning(f"No valid returns data for {region}")
            summary["regions"][region] = {"funds_processed": 0, "records": 0, "errors": 0}
            continue
        
        records, errors, region_skip_reasons = process_region(region, region_funds, log_level=log_level)
        summary["regions"][region] = {"funds_processed": len(region_funds), "records": len(records), "errors": errors}
        summary["errors"] += errors
        for reason, count in region_skip_reasons.items():
            summary["skip_reasons"][reason] += count
    
    logger.info(f"Pipeline summary: {summary}")
    log_summary(f"Pipeline completed: {summary}")
    return summary

if __name__ == "__main__":
    try:
        main(log_level=CONFIG["log_level"])
    except Exception as e:
        logger.error(f"Main execution failed: {e}")
        raise

2025-04-21 16:42:42,243 - INFO - Starting main pipeline
Processing Global:   0%|          | 0/10 [00:00<?, ?it/s]2025-04-21 16:42:44,136 - DEBUG - Starting process_fund
2025-04-21 16:42:44,137 - DEBUG - Starting run_regressions
2025-04-21 16:42:44,138 - DEBUG - Starting load_factors_for_regression
2025-04-21 16:42:44,138 - ERROR - Error processing EMDM: load_factors_for_regression() got an unexpected keyword argument 'log_level'
2025-04-21 16:42:44,139 - DEBUG - Starting process_fund
2025-04-21 16:42:44,140 - DEBUG - Starting run_regressions
2025-04-21 16:42:44,141 - DEBUG - Starting load_factors_for_regression
2025-04-21 16:42:44,142 - ERROR - Error processing GSFP: load_factors_for_regression() got an unexpected keyword argument 'log_level'
2025-04-21 16:42:44,142 - DEBUG - Starting process_fund
2025-04-21 16:42:44,144 - DEBUG - Starting run_regressions
2025-04-21 16:42:44,144 - DEBUG - Starting load_factors_for_regression
2025-04-21 16:42:44,145 - ERROR - Error processing PCGG: load