In [3]:
# ----------------------
# Imports and Setup
# ----------------------

"""
Causal Inflation Effects Analysis
==================================
This module analyzes the causal effects of inflation shocks on future real returns
using controlled regression and robustness checks (manual implementation).

Integrated into Week 1 Project
Author: Enhanced from Jupyter Notebook
Date: 2026-01-27
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Optional, Tuple
from pathlib import Path
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

In [9]:
# ------------------------------------------------
# Creating an Analysing Casual Inflation Effects
# ------------------------------------------------

class SimpleOLSResults:
    """Simple OLS results container to replace statsmodels."""
    
    def __init__(self, X, y, coef, se, t_stats, p_values, r_squared):
        self.X = X
        self.y = y
        self.params = pd.Series(coef, index=X.columns)
        self.bse = pd.Series(se, index=X.columns)
        self.tvalues = pd.Series(t_stats, index=X.columns)
        self.pvalues = pd.Series(p_values, index=X.columns)
        self.rsquared = r_squared
        self.nobs = len(y)
        
    def conf_int(self, alpha=0.05):
        """Calculate confidence intervals."""
        from scipy.stats import t
        df = self.nobs - len(self.params)
        t_crit = t.ppf(1 - alpha/2, df)
        ci_lower = self.params - t_crit * self.bse
        ci_upper = self.params + t_crit * self.bse
        return pd.DataFrame({0: ci_lower, 1: ci_upper})
    
    def summary(self):
        """Create summary string."""
        summary = f"\nOLS Regression Results\n"
        summary += "=" * 78 + "\n"
        summary += f"R-squared: {self.rsquared:.4f}\n"
        summary += f"Observations: {self.nobs}\n\n"
        summary += f"{'Variable':<20} {'Coef':>10} {'Std Err':>10} {'t':>10} {'P>|t|':>10}\n"
        summary += "-" * 78 + "\n"
        for var in self.params.index:
            summary += f"{var:<20} {self.params[var]:>10.4f} {self.bse[var]:>10.4f} "
            summary += f"{self.tvalues[var]:>10.3f} {self.pvalues[var]:>10.4f}\n"
        summary += "=" * 78
        return summary


def simple_ols(X, y):
    """
    Simple OLS regression implementation.
    
    Parameters
    ----------
    X : pd.DataFrame
        Features (including constant)
    y : pd.Series
        Target variable
        
    Returns
    -------
    SimpleOLSResults
        Regression results
    """
    # Convert to numpy arrays with explicit float type
    X_arr = X.values.astype(float)
    y_arr = y.values.astype(float)
    
    # Calculate coefficients: (X'X)^-1 X'y
    XtX = X_arr.T @ X_arr
    Xty = X_arr.T @ y_arr
    coef = np.linalg.solve(XtX, Xty)
    
    # Predictions and residuals
    y_pred = X_arr @ coef
    residuals = y_arr - y_pred
    
    # Calculate statistics
    n = len(y)
    k = X_arr.shape[1]
    df = n - k
    
    # R-squared
    ss_res = np.sum(residuals ** 2)
    ss_tot = np.sum((y_arr - y_arr.mean()) ** 2)
    r_squared = 1 - (ss_res / ss_tot)
    
    # Standard errors
    mse = ss_res / df
    var_coef = mse * np.linalg.inv(XtX)
    se = np.sqrt(np.diag(var_coef))
    
    # T-statistics and p-values
    t_stats = coef / se
    from scipy.stats import t as t_dist
    p_values = 2 * (1 - t_dist.cdf(np.abs(t_stats), df))
    
    return SimpleOLSResults(X, y, coef, se, t_stats, p_values, r_squared)


class InflationShockAnalyzer:
    def __init__(self, df, figures_dir=None):
        self.df = df.copy()
        self.results = {}

        if figures_dir is None:
            self.figures_dir = Path.cwd() / "figures"
        else:
            self.figures_dir = Path(figures_dir)

        self.figures_dir.mkdir(parents=True, exist_ok=True)

        
    def define_inflation_shock(
        self, 
        inflation_col: str = 'Inflation_Rate',
        window: int = 24,
        threshold_std: float = 1.5
    ) -> pd.Series:
        """
        Define inflation shocks based on rolling statistics.
        
        Parameters
        ----------
        inflation_col : str, default='Inflation_Rate'
            Column name for inflation data
        window : int, default=24
            Rolling window size in months
        threshold_std : float, default=1.5
            Number of standard deviations above mean to define shock
            
        Returns
        -------
        pd.Series
            Binary indicator of inflation shock periods
        """
        rolling_mean = self.df[inflation_col].rolling(window).mean()
        rolling_std = self.df[inflation_col].rolling(window).std()
        
        self.df["Inflation_Shock"] = (
            self.df[inflation_col] > rolling_mean + threshold_std * rolling_std
        ).astype(int)
        
        # Also calculate shock magnitude
        self.df["Shock_Magnitude"] = np.where(
            self.df["Inflation_Shock"] == 1,
            self.df[inflation_col] - rolling_mean,
            0
        )
        
        return self.df["Inflation_Shock"]
    
    def calculate_future_returns(
        self, 
        return_col: str = 'Real_Return',
        horizons: Optional[List[int]] = None
    ) -> pd.DataFrame:
        """
        Calculate forward-looking real returns over specified horizons.
        
        Parameters
        ----------
        return_col : str, default='Real_Return'
            Column name for returns
        horizons : List[int], optional
            Forward-looking periods in months. If None, uses [6, 12]
            
        Returns
        -------
        pd.DataFrame
            DataFrame with future return columns added
        """
        if horizons is None:
            horizons = [6, 12]
        
        for horizon in horizons:
            col_name = f"Future_Return_{horizon}m"
            self.df[col_name] = (
                self.df[return_col]
                .rolling(horizon)
                .sum()
                .shift(-horizon)
            )
        
        return self.df
    
    def naive_comparison(
        self,
        future_return_col: str = 'Future_Return_12m',
        plot: bool = True
    ) -> Dict[str, float]:
        """
        Perform naive comparison of returns during shock vs non-shock periods.
        
        Parameters
        ----------
        future_return_col : str, default='Future_Return_12m'
            Column name for future returns
        plot : bool, default=True
            Whether to create visualization
            
        Returns
        -------
        Dict[str, float]
            Mean returns for each shock state
        """
        comparison_data = self.df[["Inflation_Shock", future_return_col]].dropna()
        
        comparison = (
            comparison_data
            .groupby("Inflation_Shock")[future_return_col]
            .agg(['mean', 'std', 'count'])
            .to_dict('index')
        )
        
        self.results['naive_comparison'] = comparison
        
        if plot:
            fig, axes = plt.subplots(1, 2, figsize=(14, 6))
            
            # Box plot
            comparison_data.boxplot(
                column=future_return_col,
                by='Inflation_Shock',
                ax=axes[0]
            )
            axes[0].set_xlabel('Inflation Shock Period', fontsize=12, fontweight='bold')
            axes[0].set_ylabel('Future 12-Month Returns', fontsize=12, fontweight='bold')
            axes[0].set_title('Return Distribution by Shock Status', fontsize=13, fontweight='bold')
            axes[0].set_xticklabels(['Normal', 'Shock'])
            plt.sca(axes[0])
            plt.xticks([1, 2], ['Normal', 'Shock'])
            
            # Bar plot with error bars
            means = [comparison[0]['mean'], comparison[1]['mean']]
            stds = [comparison[0]['std'], comparison[1]['std']]
            
            bars = axes[1].bar(['Normal', 'Shock'], means, 
                              color=['#3498db', '#e74c3c'], alpha=0.7, edgecolor='black')
            axes[1].errorbar(['Normal', 'Shock'], means, yerr=stds, 
                           fmt='none', color='black', capsize=10, capthick=2)
            
            # Add value labels
            for bar, mean in zip(bars, means):
                height = bar.get_height()
                axes[1].text(bar.get_x() + bar.get_width()/2., height,
                           f'{mean:.4f}',
                           ha='center', va='bottom', fontsize=11, fontweight='bold')
            
            axes[1].set_ylabel('Mean Future 12-Month Return', fontsize=12, fontweight='bold')
            axes[1].set_title('Average Returns: Shock vs Normal', fontsize=13, fontweight='bold')
            axes[1].axhline(y=0, color='black', linestyle='--', alpha=0.5)
            axes[1].grid(True, alpha=0.3, axis='y')
            
            plt.suptitle('Naive Comparison: Inflation Shocks and Future Returns', 
                        fontsize=14, fontweight='bold', y=1.02)
            plt.tight_layout()
            
            save_path = self.figures_dir / 'causal_naive_comparison.png'
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"✓ Saved: {save_path}")
            plt.close()
        
        return comparison
    
    def controlled_regression(
        self,
        future_return_col: str = 'Future_Return_12m',
        controls: Optional[List[str]] = None,
        plot: bool = True
    ) -> SimpleOLSResults:
        """
        Run OLS regression controlling for confounding variables.
        
        This is a manual implementation without DoWhy/EconML.
        
        Parameters
        ----------
        future_return_col : str, default='Future_Return_12m'
            Column name for dependent variable
        controls : List[str], optional
            List of control variable names. If None, uses available controls.
        plot : bool, default=True
            Whether to create visualization
            
        Returns
        -------
        SimpleOLSResults
            Fitted OLS model results
        """
        # Use available columns as controls
        if controls is None:
            controls = []
            if 'Nominal_Return' in self.df.columns:
                controls.append('Nominal_Return')
            if 'Inflation_Regime' in self.df.columns:
                # Create dummy variables for regime
                regime_dummies = pd.get_dummies(self.df['Inflation_Regime'], prefix='Regime')
                self.df = pd.concat([self.df, regime_dummies], axis=1)
                # Add regime dummies (excluding one as reference)
                regime_cols = [col for col in regime_dummies.columns if col != regime_dummies.columns[0]]
                controls.extend(regime_cols)
        
        # Prepare features
        features = ["Inflation_Shock"] + controls
        available_features = [f for f in features if f in self.df.columns]
        
        X = self.df[available_features].copy()
        # Add constant manually
        X.insert(0, 'const', 1.0)
        
        # Prepare target
        y = self.df[future_return_col]
        
        # Drop rows with any NaN
        combined = pd.concat([X, y], axis=1).dropna()
        X_clean = combined[X.columns]
        y_clean = combined[future_return_col]
        
        # Fit model using manual OLS
        self.model = simple_ols(X_clean, y_clean)
        self.results['controlled_regression'] = self.model
        
        if plot:
            # Create coefficient plot
            fig, ax = plt.subplots(figsize=(10, 6))
            
            # Get coefficients (excluding intercept)
            coef_df = pd.DataFrame({
                'Coefficient': self.model.params[1:],
                'CI_Lower': self.model.conf_int()[0][1:],
                'CI_Upper': self.model.conf_int()[1][1:]
            })
            
            # Plot
            y_pos = np.arange(len(coef_df))
            colors = ['#e74c3c' if x == 'Inflation_Shock' else '#3498db' 
                     for x in coef_df.index]
            
            ax.barh(y_pos, coef_df['Coefficient'], color=colors, alpha=0.7, 
                   edgecolor='black', linewidth=1.5)
            ax.errorbar(coef_df['Coefficient'], y_pos, 
                       xerr=[coef_df['Coefficient'] - coef_df['CI_Lower'],
                             coef_df['CI_Upper'] - coef_df['Coefficient']],
                       fmt='none', color='black', capsize=5, capthick=2)
            
            ax.set_yticks(y_pos)
            ax.set_yticklabels(coef_df.index)
            ax.axvline(x=0, color='black', linestyle='--', linewidth=1)
            ax.set_xlabel('Coefficient Estimate', fontsize=12, fontweight='bold')
            ax.set_title(f'Regression Coefficients: Impact on {future_return_col}\n'
                        f'R² = {self.model.rsquared:.4f}', 
                        fontsize=13, fontweight='bold')
            ax.grid(True, alpha=0.3, axis='x')
            
            plt.tight_layout()
            
            save_path = self.figures_dir / 'causal_controlled_regression.png'
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"✓ Saved: {save_path}")
            plt.close()
        
        return self.model
    
    def regime_effects(
        self,
        future_return_col: str = 'Future_Return_12m',
        regime_column: str = "Inflation_Regime",
        plot: bool = True
    ) -> Dict[str, Dict[str, float]]:
        """
        Calculate average effects by inflation regime.
        
        Parameters
        ----------
        future_return_col : str, default='Future_Return_12m'
            Column name for future returns
        regime_column : str, default="Inflation_Regime"
            Name of the column containing regime classifications
        plot : bool, default=True
            Whether to create visualization
            
        Returns
        -------
        Dict[str, Dict[str, float]]
            Statistics for each regime
        """
        if regime_column not in self.df.columns:
            print(f"Warning: Column '{regime_column}' not found in DataFrame")
            return {}
        
        regime_data = self.df[[regime_column, future_return_col, 'Inflation_Shock']].dropna()
        
        regime_effects = {}
        for regime in regime_data[regime_column].unique():
            subset = regime_data[regime_data[regime_column] == regime]
            
            # Overall stats
            mean_return = subset[future_return_col].mean()
            
            # Shock vs non-shock within regime
            shock_mean = subset[subset['Inflation_Shock'] == 1][future_return_col].mean()
            normal_mean = subset[subset['Inflation_Shock'] == 0][future_return_col].mean()
            
            regime_effects[regime] = {
                'mean_return': mean_return,
                'shock_mean': shock_mean,
                'normal_mean': normal_mean,
                'shock_effect': shock_mean - normal_mean if not pd.isna(shock_mean) else np.nan,
                'count': len(subset)
            }
        
        self.results['regime_effects'] = regime_effects
        
        if plot and len(regime_effects) > 0:
            fig, ax = plt.subplots(figsize=(12, 6))
            
            regimes = list(regime_effects.keys())
            x = np.arange(len(regimes))
            width = 0.35
            
            shock_means = [regime_effects[r]['shock_mean'] for r in regimes]
            normal_means = [regime_effects[r]['normal_mean'] for r in regimes]
            
            bars1 = ax.bar(x - width/2, normal_means, width, label='Normal Periods',
                          color='#3498db', alpha=0.8, edgecolor='black')
            bars2 = ax.bar(x + width/2, shock_means, width, label='Shock Periods',
                          color='#e74c3c', alpha=0.8, edgecolor='black')
            
            # Add value labels
            for bars in [bars1, bars2]:
                for bar in bars:
                    height = bar.get_height()
                    if not np.isnan(height):
                        ax.text(bar.get_x() + bar.get_width()/2., height,
                               f'{height:.3f}',
                               ha='center', va='bottom', fontsize=9)
            
            ax.set_xlabel('Inflation Regime', fontsize=12, fontweight='bold')
            ax.set_ylabel('Mean Future 12-Month Return', fontsize=12, fontweight='bold')
            ax.set_title('Regime-Specific Shock Effects', fontsize=13, fontweight='bold')
            ax.set_xticks(x)
            ax.set_xticklabels(regimes)
            ax.legend(fontsize=11)
            ax.axhline(y=0, color='black', linestyle='--', alpha=0.5)
            ax.grid(True, alpha=0.3, axis='y')
            
            plt.tight_layout()
            
            save_path = self.figures_dir / 'causal_regime_effects.png'
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"✓ Saved: {save_path}")
            plt.close()
        
        return regime_effects
    
    def robustness_check(
        self,
        shift_periods: int = 12
    ) -> pd.Series:
        """
        Create a falsification test by shifting shock indicator.
        This is a manual robustness check (placebo test).
        
        Parameters
        ----------
        shift_periods : int, default=12
            Number of periods to shift the shock indicator
            
        Returns
        -------
        pd.Series
            Shifted shock indicator for robustness testing
        """
        self.df["Fake_Shock"] = self.df["Inflation_Shock"].shift(-shift_periods)
        return self.df["Fake_Shock"]
    
    def run_full_analysis(
        self,
        inflation_col: str = 'Inflation_Rate',
        return_col: str = 'Real_Return',
        window: int = 24,
        horizons: Optional[List[int]] = None,
        controls: Optional[List[str]] = None,
        verbose: bool = True
    ) -> Dict:
        """
        Run complete causal analysis pipeline.
        
        Parameters
        ----------
        inflation_col : str, default='Inflation_Rate'
            Column name for inflation
        return_col : str, default='Real_Return'
            Column name for returns
        window : int, default=24
            Rolling window for shock definition
        horizons : List[int], optional
            Forward-looking return horizons
        controls : List[str], optional
            Control variables for regression
        verbose : bool, default=True
            Whether to print results
            
        Returns
        -------
        Dict
            Dictionary containing all analysis results
        """
        if horizons is None:
            horizons = [6, 12]
        
        if verbose:
            print("=" * 70)
            print("CAUSAL INFLATION EFFECTS ANALYSIS")
            print("=" * 70)
        
        # Define shocks
        self.define_inflation_shock(inflation_col, window=window)
        shock_count = self.df["Inflation_Shock"].sum()
        
        if verbose:
            print(f"\nInflation Shocks Identified: {shock_count} periods")
            print(f"  ({shock_count / len(self.df) * 100:.1f}% of observations)")
        
        # Calculate future returns
        self.calculate_future_returns(return_col, horizons=horizons)
        
        # Naive comparison
        if verbose:
            print("\n1. Naive Comparison (Unconditional Means)")
            print("-" * 70)
        
        naive = self.naive_comparison(plot=True)
        if verbose:
            for shock_state, stats in naive.items():
                label = "Shock Period" if shock_state == 1 else "Normal Period"
                print(f"  {label}:")
                print(f"    Mean: {stats['mean']:.6f}")
                print(f"    Std:  {stats['std']:.6f}")
                print(f"    N:    {stats['count']}")
        
        # Controlled regression
        if verbose:
            print("\n2. Controlled Regression Results")
            print("-" * 70)
        
        model = self.controlled_regression(controls=controls, plot=True)
        if verbose:
            print(f"\n{model.summary()}")
        
        # Regime effects
        if "Inflation_Regime" in self.df.columns:
            if verbose:
                print("\n3. Regime-Specific Effects")
                print("-" * 70)
            regime_results = self.regime_effects(plot=True)
            if verbose:
                for regime, stats in regime_results.items():
                    print(f"\n  {regime}:")
                    print(f"    Overall mean: {stats['mean_return']:.6f}")
                    if not np.isnan(stats['shock_mean']):
                        print(f"    Shock mean:   {stats['shock_mean']:.6f}")
                        print(f"    Normal mean:  {stats['normal_mean']:.6f}")
                        print(f"    Shock effect: {stats['shock_effect']:.6f}")
        
        # Robustness check
        self.robustness_check()
        if verbose:
            print("\n4. Robustness Check")
            print("-" * 70)
            print("  ✓ Fake shock indicator created for falsification testing")
            print("    (Shifted actual shocks by 12 months for placebo test)")
        
        if verbose:
            print("\n" + "=" * 70)
            print("KEY FINDINGS")
            print("=" * 70)
            
            # Interpret results
            diff = naive[1]['mean'] - naive[0]['mean']
            print(f"\n• Unconditional shock effect: {diff:.6f}")
            
            if self.model is not None:
                shock_coef = self.model.params.get('Inflation_Shock', np.nan)
                shock_pval = self.model.pvalues.get('Inflation_Shock', np.nan)
                if not np.isnan(shock_coef):
                    print(f"• Controlled shock effect: {shock_coef:.6f}")
                    if shock_pval < 0.05:
                        print(f"  (Statistically significant, p={shock_pval:.4f})")
                    else:
                        print(f"  (Not statistically significant, p={shock_pval:.4f})")
        
        return self.results
    
    def get_coefficient_summary(self) -> pd.DataFrame:
        """
        Extract key coefficients and statistics from the regression model.
        
        Returns
        -------
        pd.DataFrame
            Summary of coefficients, standard errors, and p-values
        """
        if self.model is None:
            raise ValueError("Model has not been fitted. Run controlled_regression first.")
        
        summary_df = pd.DataFrame({
            'Coefficient': self.model.params,
            'Std_Error': self.model.bse,
            'T_Statistic': self.model.tvalues,
            'P_Value': self.model.pvalues,
            'CI_Lower': self.model.conf_int()[0],
            'CI_Upper': self.model.conf_int()[1]
        })
        
        return summary_df


def load_project_data(data_path=None):
    """
    Load the combined analysis data from the Week 1 project.
    """
    if data_path is None:
        project_root = Path.cwd()
        while project_root.name != "Inflation vs Market Returns Analysis":
            project_root = project_root.parent

        data_path = (
            project_root
            / "data"
            / "processed"
            / "combined_analysis.csv"
        )

    df = pd.read_csv(
        data_path,
        parse_dates=["Date"],
        index_col="Date"
    )

    return df


def main():
    """
    Run causal inflation effects analysis on Week 1 project data.
    """
    print("\nLoading Week 1 Project Data...")
    print("-" * 70)
    
    # Load data
    df = load_project_data()
    print(f"✓ Loaded {len(df)} observations")
    print(f"  Date range: {df.index.min()} to {df.index.max()}")
    
    # Initialize analyzer
    analyzer = InflationShockAnalyzer(df)
    
    # Run full analysis
    results = analyzer.run_full_analysis(verbose=True)
    
    # Get detailed coefficient summary
    print("\n5. Detailed Coefficient Summary")
    print("-" * 70)
    coef_summary = analyzer.get_coefficient_summary()
    print(coef_summary.to_string())
    
    print("\n" + "=" * 70)
    print("✓ ANALYSIS COMPLETE")
    print("=" * 70)
    print(f"\nFigures saved to: {analyzer.figures_dir}")
    
    return analyzer, results


if __name__ == "__main__":
    analyzer, results = main()


Loading Week 1 Project Data...
----------------------------------------------------------------------
✓ Loaded 130 observations
  Date range: 2012-05-31 00:00:00 to 2023-02-28 00:00:00
CAUSAL INFLATION EFFECTS ANALYSIS

Inflation Shocks Identified: 25 periods
  (19.2% of observations)

1. Naive Comparison (Unconditional Means)
----------------------------------------------------------------------
✓ Saved: C:\Users\rfull\Building Data Together Weeklies\Finance February\Inflation vs Market Returns Analysis\figures\causal_naive_comparison.png
  Normal Period:
    Mean: -0.068499
    Std:  0.127741
    N:    97
  Shock Period:
    Mean: -0.570210
    Std:  0.455363
    N:    21

2. Controlled Regression Results
----------------------------------------------------------------------
✓ Saved: C:\Users\rfull\Building Data Together Weeklies\Finance February\Inflation vs Market Returns Analysis\figures\causal_controlled_regression.png


OLS Regression Results
R-squared: 0.8094
Observations: 118