# Sign Restriction Identification in Structural VARs

This notebook implements SVAR identification using **sign restrictions** on impulse response functions.

## Method: Rubio-Ramírez, Waggoner & Zha (2010)

### Key Idea:
Instead of imposing exact zero restrictions (like Blanchard-Quah), we identify shocks by imposing **inequality constraints** on IRFs:

- **Supply shock**: GDP growth ↑, Inflation ↓
- **Demand shock**: GDP growth ↑, Inflation ↑

### Algorithm:
1. Estimate reduced-form VAR → obtain variance-covariance matrix Ω
2. Cholesky decomposition: Ω = P·P'
3. For each draw:
   - Generate random orthogonal matrix Q via QR decomposition
   - Candidate identification: S = P·Q'
   - Compute IRFs and check if sign restrictions are satisfied
   - If yes, accept this identification
4. Report distribution of accepted IRFs (set-identification)

### References:
- Rubio-Ramírez, J.F., Waggoner, D.F. & Zha, T. (2010). "Structural Vector Autoregressions: Theory of Identification and Algorithms for Inference", *Review of Economic Studies*, 77(2), 665-696.
- Uhlig, H. (2005). "What are the effects of monetary policy on output?", *Journal of Monetary Economics*, 52(2), 381-419.

In [None]:
# Cell 1: Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import fredapi

# Install fredapi if needed:
# !pip install fredapi

FRED_API_KEY = ''  # Replace with your API key from https://fredaccount.stlouisfed.org/apikeys

# Initialize FRED API
fred = fredapi.Fred(api_key=FRED_API_KEY)

In [None]:
# Cell 2: Core VAR Functions (reused from Blanchard-Quah notebook)

def estimate_var(data, nlags, maxlags=None):
    """
    Estimate a VAR(nlags) using OLS.
    
    Parameters:
    -----------
    data : array of shape (nvars, T)
        Variables in rows, time in columns
    nlags : int
        Number of lags in the VAR
    maxlags : int, optional
        Maximum number of lags to fix sample size
    
    Returns:
    --------
    dict with keys:
        'coef': list of coefficient matrices
        'intercept': constant term
        'resid': residuals
        'Omega': covariance matrix of residuals
        'nobs', 'nlags', 'nvars'
        'AIC', 'SIC', 'HQIC'
    """
    
    # Fix effective sample size
    effective_lag = maxlags if maxlags is not None else nlags
    
    Y = data
    T = Y.shape[1] - effective_lag
    nvars = Y.shape[0]
    
    # Construct LHS and RHS (Lütkepohl notation)
    Y_reg = Y[:, effective_lag:]
    
    X = np.ones((1, T))  # constant
    for lag in range(1, nlags + 1):
        X = np.vstack([X, Y[:, effective_lag-lag : effective_lag + T - lag]])
    X_reg = X
    
    # OLS estimation
    coef_all = Y_reg @ X_reg.T @ np.linalg.inv(X_reg @ X_reg.T)
    
    # Residuals and covariance
    Y_fitted = coef_all @ X_reg
    resid = Y_reg - Y_fitted
    Omega = (resid @ resid.T) / T
    
    # Information criteria
    log_det_Omega = np.log(np.linalg.det(Omega))
    log_likelihood = -(T/2) * (nvars*np.log(2*np.pi) + nvars + log_det_Omega)
    nv = nvars * (nvars*nlags + 1)
    
    AIC  = -2*log_likelihood + 2*nv
    HQIC = -2*log_likelihood + 2*nv*np.log(np.log(T))
    SIC  = -2*log_likelihood + nv*np.log(T)
    
    # Extract coefficients
    intercept = coef_all[:, 0]
    coef_matrices = []
    for lag in range(nlags):
        start_idx = 1 + lag * nvars
        end_idx = 1 + (lag + 1) * nvars
        coef_matrices.append(coef_all[:, start_idx:end_idx])
    
    return {
        'coef': coef_matrices,
        'intercept': intercept,
        'resid': resid,
        'Omega': Omega,
        'nobs': T,
        'nlags': nlags,
        'nvars': nvars,
        'log_likelihood': log_likelihood,
        'AIC': AIC,
        'SIC': SIC,
        'HQIC': HQIC
    }


def compute_impulse_response(var_results, nperiods=21, S=None):
    """
    Compute impulse response functions.
    
    Parameters:
    -----------
    var_results : dict
        Output from estimate_var()
    nperiods : int
        Number of periods for IRFs
    S : array, optional
        Identification matrix. If None, uses Cholesky decomposition
    
    Returns:
    --------
    irf : array of shape (nperiods, nvars, nvars)
        irf[t, i, j] = response of variable i at time t to shock j
    """
    
    if S is None:
        S = np.linalg.cholesky(var_results['Omega'])
    
    nlags = var_results['nlags']
    nvars = var_results['nvars']
    
    # Companion form
    companion_size = nvars * nlags
    B_companion = np.zeros((companion_size, companion_size))
    
    for i in range(nlags):
        start_col = i * nvars
        end_col = (i + 1) * nvars
        B_companion[:nvars, start_col:end_col] = var_results['coef'][i]
    
    if nlags > 1:
        B_companion[nvars:, :nvars*(nlags-1)] = np.eye(nvars*(nlags-1))
    
    # Q matrix
    Q = np.zeros((companion_size, nvars))
    Q[:nvars, :] = S
    
    # Compute IRFs
    irf = np.zeros((nperiods, nvars, nvars))
    B_power = np.eye(companion_size)
    
    for t in range(nperiods):
        Y_impact = B_power @ Q
        irf[t, :, :] = Y_impact[:nvars, :].T
        B_power = B_power @ B_companion
    
    return irf


def compute_information_criteria(data, maxlags=8):
    """Compute AIC, SIC, HQC for lag selection."""
    
    criteria = {'AIC': [], 'SIC': [], 'HQC': []}
    
    print("\nInformation Criteria")
    print("====================")
    print("Lag    AIC      SIC      HQC")
    print("=================================")
    
    for lag in range(1, maxlags + 1):
        var_result = estimate_var(data, lag, maxlags)
        
        aic = var_result['AIC']
        sic = var_result['SIC']
        hqc = var_result['HQIC']
        
        criteria['AIC'].append(aic)
        criteria['SIC'].append(sic)
        criteria['HQC'].append(hqc)
        
        print(f" {lag}  {aic:8.3f} {sic:8.3f} {hqc:8.3f}")
    
    # Find optimal lags
    opt_aic = np.argmin(criteria['AIC']) + 1
    opt_sic = np.argmin(criteria['SIC']) + 1
    opt_hqc = np.argmin(criteria['HQC']) + 1
    
    criteria['optimal_AIC'] = opt_aic
    criteria['optimal_SIC'] = opt_sic
    criteria['optimal_HQC'] = opt_hqc
    
    print("\nOptimal number of lags:")
    print("======================")
    print(f"AIC: {opt_aic}")
    print(f"SIC: {opt_sic}")
    print(f"HQC: {opt_hqc}")
    
    return criteria

In [None]:
# Cell 3: Data Loading Functions

def load_gdp_inflation_data(fred, start_date='1948-01-01', end_date='2019-10-01'):
    """
    Load GDP and CPI data from FRED and compute growth rates.
    
    Parameters:
    -----------
    fred : fredapi.Fred
        FRED API object
    start_date, end_date : str
        Date range for data
    
    Returns:
    --------
    df : DataFrame
        Contains GDP and CPI in levels
    """
    
    # Download quarterly GDP (real)
    gdp = fred.get_series('GDPC1').to_frame(name='gdp')
    
    # Download CPI (monthly) and convert to quarterly
    cpi = fred.get_series('CPIAUCSL').to_frame(name='cpi')
    cpi_q = cpi.resample('QS').mean()  # Average over quarter
    
    # Merge
    df = pd.merge(gdp, cpi_q, left_index=True, right_index=True, how='inner')
    
    # Filter dates
    df = df[(df.index >= start_date) & (df.index <= end_date)]
    
    return df


def prepare_gdp_inflation_data(df):
    """
    Compute GDP growth and inflation from levels.
    
    Returns:
    --------
    data : array of shape (2, T)
        Row 0: GDP growth (log diff, annualized %)
        Row 1: Inflation (log diff, annualized %)
    """
    
    # Log differences (quarterly rates)
    gdp_growth = np.log(df['gdp']).diff().dropna()
    inflation = np.log(df['cpi']).diff().dropna()
    
    # Convert to annualized percentage points
    gdp_growth = 400 * gdp_growth.values  # 4 quarters * 100%
    inflation = 400 * inflation.values
    
    # Align (inflation calculated after GDP growth due to diff)
    T = min(len(gdp_growth), len(inflation))
    
    data = np.array([gdp_growth[:T], inflation[:T]])
    
    return data

In [None]:
# Cell 4: Sign Restriction Identification Functions

def sign_restriction_identification(var_results, sign_restrictions, horizon=2, 
                                     max_draws=10000, nperiods=21, verbose=True):
    """
    Identify structural shocks using sign restrictions.
    
    Algorithm: Rubio-Ramírez, Waggoner & Zha (2010)
    
    Parameters:
    -----------
    var_results : dict
        Output from estimate_var()
    sign_restrictions : list of dicts
        Each dict has keys 'var_idx', 'shock_idx', 'sign', 'horizon'
        Example: [{'var_idx': 0, 'shock_idx': 0, 'sign': '+'},
                  {'var_idx': 1, 'shock_idx': 0, 'sign': '-'}]
        means shock 0 increases var 0 and decreases var 1
    horizon : int
        Number of periods to enforce sign restrictions
    max_draws : int
        Maximum number of candidate rotations to try
    nperiods : int
        Total periods for IRFs to store
    verbose : bool
        Print progress
    
    Returns:
    --------
    irf_accepted : array of shape (n_accepted, nperiods, nvars, nvars)
        All IRFs that satisfy sign restrictions
    acceptance_rate : float
        Proportion of draws accepted
    """
    
    nvars = var_results['nvars']
    
    # Cholesky decomposition of Omega
    P = np.linalg.cholesky(var_results['Omega'])
    
    irf_accepted = []
    n_accepted = 0
    
    if verbose:
        print(f"\nSearching for valid identifications (max {max_draws} draws)...")
    
    for draw in range(max_draws):
        
        if verbose and draw % 1000 == 0 and draw > 0:
            print(f"Draw {draw}/{max_draws} | Accepted: {n_accepted} ({100*n_accepted/draw:.1f}%)")
        
        # Step 1: Generate random orthogonal matrix via QR decomposition
        L = np.random.randn(nvars, nvars)
        Q, R = np.linalg.qr(L)
        
        # Step 2: Normalize Q (ensure diagonal of R is positive)
        for i in range(nvars):
            if R[i, i] < 0:
                Q[:, i] = -Q[:, i]
        
        # Step 3: Candidate identification matrix
        S_candidate = P @ Q.T
        
        # Step 4: Compute IRFs for this candidate
        irf_candidate = compute_impulse_response(var_results, nperiods, S_candidate)
        
        # Step 5: Check sign restrictions
        if check_sign_restrictions(irf_candidate, sign_restrictions, horizon):
            irf_accepted.append(irf_candidate)
            n_accepted += 1
    
    if verbose:
        acceptance_rate = n_accepted / max_draws
        print(f"\n{'='*50}")
        print(f"Total draws: {max_draws}")
        print(f"Accepted: {n_accepted}")
        print(f"Acceptance rate: {100*acceptance_rate:.2f}%")
        print(f"{'='*50}\n")
    
    if n_accepted == 0:
        raise ValueError("No valid identifications found! Check your sign restrictions.")
    
    return np.array(irf_accepted), acceptance_rate


def check_sign_restrictions(irf, restrictions, horizon):
    """
    Check if IRF satisfies all sign restrictions.
    
    Parameters:
    -----------
    irf : array of shape (nperiods, nvars, nvars)
        Impulse responses
    restrictions : list of dicts
        Sign restrictions to check
    horizon : int
        Number of periods to check
    
    Returns:
    --------
    bool : True if all restrictions satisfied
    """
    
    for restr in restrictions:
        var_idx = restr['var_idx']
        shock_idx = restr['shock_idx']
        sign = restr['sign']
        
        # Check restriction for each period up to horizon
        for h in range(horizon):
            value = irf[h, shock_idx, var_idx]  # Note: irf[t, shock, var]
            
            if sign == '+' and value <= 0:
                return False
            elif sign == '-' and value >= 0:
                return False
    
    return True

In [None]:
# Cell 5: Visualization Functions

def plot_sign_restriction_irfs(irf_accepted, var_names=['GDP Growth', 'Inflation'],
                                shock_names=['Supply Shock', 'Demand Shock'],
                                percentiles=[5, 50, 95]):
    """
    Plot all accepted IRFs with percentile bands.
    
    Parameters:
    -----------
    irf_accepted : array of shape (n_accepted, nperiods, nvars, nvars)
        All accepted IRFs
    var_names : list of str
        Names of variables
    shock_names : list of str
        Names of shocks
    percentiles : list of float
        Percentiles to plot (default: 5th, 50th, 95th)
    """
    
    n_accepted, nperiods, nvars, nshocks = irf_accepted.shape
    periods = np.arange(nperiods)
    
    fig, axes = plt.subplots(nvars, nshocks, figsize=(12, 8))
    
    if nvars == 1:
        axes = axes.reshape(1, -1)
    
    for i, shock_name in enumerate(shock_names):
        for j, var_name in enumerate(var_names):
            
            ax = axes[j, i]
            
            # Plot all accepted IRFs (light gray)
            for k in range(n_accepted):
                ax.plot(periods, irf_accepted[k, :, i, j],
                       color='gray', alpha=0.05, linewidth=0.5)
            
            # Compute and plot percentiles
            irf_percentiles = np.percentile(irf_accepted[:, :, i, j], percentiles, axis=0)
            
            # 5th and 95th percentiles (dashed)
            ax.plot(periods, irf_percentiles[0], 'b--', linewidth=1.5, 
                   label=f'{percentiles[0]}th percentile')
            ax.plot(periods, irf_percentiles[2], 'b--', linewidth=1.5,
                   label=f'{percentiles[2]}th percentile')
            
            # Median (solid red)
            ax.plot(periods, irf_percentiles[1], 'r-', linewidth=2.5,
                   label='Median', zorder=10)
            
            # Formatting
            ax.set_title(f'{var_name} → {shock_name}', fontsize=11)
            ax.grid(True, alpha=0.3)
            ax.axhline(0, color='k', linestyle='-', linewidth=0.8, alpha=0.5)
            ax.set_xlabel('Quarters')
            ax.set_ylabel('Percentage points')
            
            # Legend only in top-left panel
            if i == 0 and j == 0:
                ax.legend(fontsize=8, loc='best')
    
    plt.suptitle(f'Impulse Responses with Sign Restrictions ({n_accepted} accepted models)',
                fontsize=13, fontweight='bold')
    plt.tight_layout()
    plt.show()


def plot_data(df, data):
    """Plot GDP growth and inflation time series."""
    
    fig, ax = plt.subplots(figsize=(12, 5))
    
    periods = pd.date_range(start=df.index[1], periods=data.shape[1], freq='QS')
    
    ax.plot(periods, data[0], 'b-', linewidth=1.5, label='GDP Growth (annual %)')
    ax.plot(periods, data[1], 'r-', linewidth=1.5, label='Inflation (annual %)')
    
    ax.set_title('GDP Growth and Inflation', fontsize=14, fontweight='bold')
    ax.set_xlabel('Date')
    ax.set_ylabel('Percent')
    ax.legend(loc='best')
    ax.grid(True, alpha=0.3)
    ax.axhline(0, color='k', linestyle='-', linewidth=0.5)
    
    plt.tight_layout()
    plt.show()

In [None]:
# Cell 6: Main Analysis

# Set date range
start_date = '1955-01-01'
end_date = '2019-10-01'

print("Loading data from FRED...")
df = load_gdp_inflation_data(fred, start_date, end_date)

print("Preparing data (computing growth rates)...")
data = prepare_gdp_inflation_data(df)

print(f"\nData shape: {data.shape}")
print(f"Variables: GDP growth, Inflation")
print(f"Observations: {data.shape[1]}")
print(f"Period: {start_date} to {end_date}")

# Plot data
plot_data(df, data)

In [None]:
# Cell 7: Estimate VAR

# Check lag length
criteria = compute_information_criteria(data, maxlags=8)

# Estimate VAR with selected lag length
nlags = 3  # Or use criteria['optimal_SIC']
print(f"\nEstimating VAR({nlags})...")

var_results = estimate_var(data, nlags)

print(f"\nVAR estimation complete:")
print(f"  Number of observations: {var_results['nobs']}")
print(f"  Number of variables: {var_results['nvars']}")
print(f"  Number of lags: {var_results['nlags']}")
print(f"  Log-likelihood: {var_results['log_likelihood']:.2f}")
print(f"\nCovariance matrix of residuals (Omega):")
print(var_results['Omega'])

In [None]:
# Cell 8: Apply Sign Restrictions

# Define sign restrictions
# Supply shock (shock 0): GDP ↑, Inflation ↓
# Demand shock (shock 1): GDP ↑, Inflation ↑

sign_restrictions = [
    # Supply shock (shock 0)
    {'var_idx': 0, 'shock_idx': 0, 'sign': '+'},  # GDP growth > 0
    {'var_idx': 1, 'shock_idx': 0, 'sign': '-'},  # Inflation < 0
    
    # Demand shock restrictions can be added here if desired
    # {'var_idx': 0, 'shock_idx': 1, 'sign': '+'},  # GDP growth > 0
    # {'var_idx': 1, 'shock_idx': 1, 'sign': '+'},  # Inflation > 0
]

# Set parameters
horizon = 2        # Enforce restrictions for first 2 quarters
max_draws = 10000  # Try up to 10,000 random rotations
nperiods = 21      # Compute IRFs for 21 quarters

print("\n" + "="*60)
print("SIGN RESTRICTION IDENTIFICATION")
print("="*60)
print(f"\nSign restrictions (enforced for first {horizon} quarters):")
print("  Supply shock: GDP ↑, Inflation ↓")
print("  (Demand shock unrestricted)")

# Run identification
irf_accepted, acceptance_rate = sign_restriction_identification(
    var_results, 
    sign_restrictions, 
    horizon=horizon,
    max_draws=max_draws,
    nperiods=nperiods,
    verbose=True
)

In [None]:
# Cell 9: Visualize Results

print("Plotting impulse response functions...\n")

plot_sign_restriction_irfs(
    irf_accepted,
    var_names=['GDP Growth', 'Inflation'],
    shock_names=['Supply Shock', 'Demand Shock'],
    percentiles=[5, 50, 95]
)

# Summary statistics
print("\nSummary of Results:")
print("="*50)
print(f"Number of accepted models: {irf_accepted.shape[0]}")
print(f"Acceptance rate: {100*acceptance_rate:.2f}%")
print(f"\nInterpretation:")
print("  - Gray lines: All models consistent with sign restrictions")
print("  - Dashed blue: 90% confidence band (5th-95th percentiles)")
print("  - Red line: Median response across all accepted models")
print("\nNote: This is SET identification - multiple structural models")
print("      are consistent with the data and sign restrictions.")