# Phase 3: Main Analysis

This notebook implements the full analysis pipeline:
1. Data preparation
2. Baseline DiD estimation
3. DDD by worker type
4. Event-study analysis
5. Three environments analysis
6. Robustness checks
7. Heterogeneity analysis
8. Diagnostics
9. Report generation


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import sys
import warnings
warnings.filterwarnings('ignore')

# Add src to path
BASE_DIR = Path.cwd()
SRC_DIR = BASE_DIR / "src"
sys.path.insert(0, str(SRC_DIR))

# Import our modules
from io import load_raw, write_parquet
from build import assemble_panel
from models import did, ddd_worker_type, event_study
from plots import plot_event
from export import write_table
from diagnostics import pretrend_test, cell_counts

# Set up directories
OUT_DIR = BASE_DIR / "out"
DERIVED_DIR = BASE_DIR / "data" / "derived"
TABLES_DIR = OUT_DIR / "tables"
FIGURES_DIR = OUT_DIR / "figures"
REPORT_DIR = OUT_DIR / "report"
DIAGNOSTICS_DIR = OUT_DIR / "diagnostics"

for d in [DERIVED_DIR, TABLES_DIR, FIGURES_DIR, REPORT_DIR, DIAGNOSTICS_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# Set random seed
np.random.seed(42)

print("Setup complete!")
print(f"Output directories created")


## 3.1 Data Preparation


In [None]:
# Load raw data
print("=" * 80)
print("LOADING RAW DATA")
print("=" * 80)

df_raw, meta = load_raw()
print(f"\nRaw data: {len(df_raw):,} rows × {len(df_raw.columns)} columns")
print(f"\nYear range: {df_raw['year'].min()} - {df_raw['year'].max()}")
print(f"\nRegion distribution:")
print(df_raw['region_res'].value_counts())

# Check if we have Basilicata (region code 2)
has_basilicata = (df_raw['region_res'] == '2').sum() > 0
print(f"\nHas Basilicata data: {has_basilicata}")

if not has_basilicata:
    print("\nWARNING: No Basilicata data found. Will use within-Molise variation only.")
    print("Alternative: Use Puglia (14) or another control region if available.")


In [None]:
# Assemble analysis-ready panel
print("=" * 80)
print("ASSEMBLING ANALYSIS PANEL")
print("=" * 80)

panel = assemble_panel(df_raw, meta)
print(f"\nPanel: {len(panel):,} rows")
print(f"Years: {panel['year'].min()} - {panel['year'].max()}")
print(f"\nWorker types:")
print(panel['worker_type'].value_counts())
print(f"\nMolise vs Basilicata:")
print(panel['molise_res'].value_counts())
print(f"\nPre vs Post:")
print(panel['post'].value_counts())

# Save panel
write_parquet(panel, DERIVED_DIR / "analysis_ready.parquet")
print(f"\nPanel saved to {DERIVED_DIR / 'analysis_ready.parquet'}")


In [None]:
# Check data availability by cell
print("=" * 80)
print("DATA AVAILABILITY BY CELL")
print("=" * 80)

cell_counts_df = cell_counts(panel, ['molise_res', 'worker_type', 'year'])
print(cell_counts_df.pivot_table(index=['molise_res', 'worker_type'], columns='year', values='count', fill_value=0))

# Check for cells with < 50 observations
low_count_cells = cell_counts_df[cell_counts_df['count'] < 50]
if len(low_count_cells) > 0:
    print(f"\nWARNING: {len(low_count_cells)} cells have < 50 observations")
    print(low_count_cells.head(10))
else:
    print("\nAll cells have >= 50 observations")


## 3.2 Baseline DiD Estimation


In [None]:
# Define outcomes to estimate
outcomes = ['emp_prob', 'earnings_asinh', 'wage_asinh', 'contract_perm']

print("=" * 80)
print("BASELINE DiD ESTIMATION")
print("=" * 80)

baseline_results = {}

for outcome in outcomes:
    print(f"\n{'='*60}")
    print(f"Outcome: {outcome}")
    print(f"{'='*60}")
    
    try:
        result = did(panel, outcome=outcome)
        baseline_results[outcome] = result
        
        # Print summary
        print(f"\nTreatment effect (β): {result.params['treat']:.4f}")
        print(f"Standard error: {result.std_errors['treat']:.4f}")
        print(f"P-value: {result.pvalues['treat']:.4f}")
        print(f"95% CI: [{result.params['treat'] - 1.96*result.std_errors['treat']:.4f}, "
              f"{result.params['treat'] + 1.96*result.std_errors['treat']:.4f}]")
        print(f"N observations: {result.nobs:,}")
        
        # Export table
        write_table(result, TABLES_DIR / f"baseline_did_{outcome}", format='both')
        
    except Exception as e:
        print(f"Error estimating {outcome}: {e}")
        import traceback
        traceback.print_exc()


## 3.3 DDD by Worker Type


In [None]:
print("=" * 80)
print("DDD BY WORKER TYPE")
print("=" * 80)

ddd_results = {}

# Estimate for key outcomes
ddd_outcomes = ['emp_prob', 'earnings_asinh']

for outcome in ddd_outcomes:
    print(f"\n{'='*60}")
    print(f"Outcome: {outcome}")
    print(f"{'='*60}")
    
    try:
        result = ddd_worker_type(panel, outcome=outcome)
        ddd_results[outcome] = result
        
        # Extract key coefficients
        if 'treat_private' in result.params.index:
            theta = result.params['treat_private']
            theta_se = result.std_errors['treat_private']
            print(f"\nθ (Private vs Public): {theta:.4f} (SE: {theta_se:.4f}, p={result.pvalues['treat_private']:.4f})")
        
        if 'treat_self' in result.params.index:
            phi = result.params['treat_self']
            phi_se = result.std_errors['treat_self']
            print(f"φ (Self vs Public): {phi:.4f} (SE: {phi_se:.4f}, p={result.pvalues['treat_self']:.4f})")
        
        print(f"N observations: {result.nobs:,}")
        
        # Export table
        write_table(result, TABLES_DIR / f"ddd_{outcome}", format='both')
        
    except Exception as e:
        print(f"Error estimating {outcome}: {e}")
        import traceback
        traceback.print_exc()


## 3.4 Event Study


In [None]:
print("=" * 80)
print("EVENT-STUDY ANALYSIS")
print("=" * 80)

event_results = {}

# Estimate event studies
event_outcomes = ['emp_prob', 'earnings_asinh']

for outcome in event_outcomes:
    print(f"\n{'='*60}")
    print(f"Outcome: {outcome}")
    print(f"{'='*60}")
    
    try:
        # Pooled event study
        result_pooled = event_study(panel, outcome=outcome, by_type=False)
        event_results[f"{outcome}_pooled"] = result_pooled
        
        # Plot
        plot_event(result_pooled, 
                  title=f"Event Study: {outcome}",
                  outfile=FIGURES_DIR / f"fig_event_{outcome}.png",
                  by_type=False)
        
        # By worker type
        result_by_type = event_study(panel, outcome=outcome, by_type=True)
        event_results[f"{outcome}_by_type"] = result_by_type
        
        # Plot by type
        plot_event(result_by_type,
                  title=f"Event Study: {outcome} by Worker Type",
                  outfile=FIGURES_DIR / f"fig_event_{outcome}_bytype.png",
                  by_type=True)
        
        # Test parallel trends
        pretrend = pretrend_test(result_pooled)
        print(f"\nParallel trends test (pre-period):")
        print(f"  Test statistic: {pretrend.get('test_stat', 'N/A'):.4f}")
        print(f"  P-value: {pretrend.get('p_value', 'N/A'):.4f}")
        
        # Save results
        result_pooled.to_csv(TABLES_DIR / f"event_study_{outcome}.csv", index=False)
        result_by_type.to_csv(TABLES_DIR / f"event_study_{outcome}_bytype.csv", index=False)
        
    except Exception as e:
        print(f"Error estimating event study for {outcome}: {e}")
        import traceback
        traceback.print_exc()


## 3.5 Three Environments Analysis


In [None]:
# Three environments: Panel A (within Molise), Panel B (within Basilicata), Panel C (DDD)
print("=" * 80)
print("THREE ENVIRONMENTS ANALYSIS")
print("=" * 80)

# This analysis compares private vs public within each region
three_env_results = {}

# Focus on earnings_asinh
outcome = 'earnings_asinh'

try:
    # Panel A: Within Molise (private vs public)
    molise_data = panel[panel['molise_res'] == 1].copy()
    if len(molise_data) > 0:
        # Create private dummy
        molise_data['private_dummy'] = (molise_data['worker_type'] == 'private').astype(int)
        molise_data['post_private'] = molise_data['post'] * molise_data['private_dummy']
        
        # Simple DiD within Molise
        molise_data_idx = molise_data.set_index(['id_worker', 'year'])
        y_molise = molise_data_idx[outcome]
        X_molise = molise_data_idx[['post_private', 'post', 'private_dummy']].copy()
        X_molise = sm.add_constant(X_molise)
        
        from linearmodels import PanelOLS
        import statsmodels.api as sm
        mod_molise = PanelOLS(y_molise, X_molise, entity_effects=True, time_effects=True)
        result_molise = mod_molise.fit(cov_type='clustered', cluster_entity=True)
        
        panel_a_coef = result_molise.params.get('post_private', np.nan)
        three_env_results['Panel_A_Molise'] = panel_a_coef
        print(f"Panel A (Within Molise): {panel_a_coef:.4f}")
    
    # Panel B: Within Basilicata (if available)
    basilicata_data = panel[panel['molise_res'] == 0].copy()
    if len(basilicata_data) > 0:
        basilicata_data['private_dummy'] = (basilicata_data['worker_type'] == 'private').astype(int)
        basilicata_data['post_private'] = basilicata_data['post'] * basilicata_data['private_dummy']
        
        basilicata_data_idx = basilicata_data.set_index(['id_worker', 'year'])
        y_bas = basilicata_data_idx[outcome]
        X_bas = basilicata_data_idx[['post_private', 'post', 'private_dummy']].copy()
        X_bas = sm.add_constant(X_bas)
        
        mod_bas = PanelOLS(y_bas, X_bas, entity_effects=True, time_effects=True)
        result_bas = mod_bas.fit(cov_type='clustered', cluster_entity=True)
        
        panel_b_coef = result_bas.params.get('post_private', np.nan)
        three_env_results['Panel_B_Basilicata'] = panel_b_coef
        print(f"Panel B (Within Basilicata): {panel_b_coef:.4f}")
    else:
        print("Panel B: No Basilicata data available")
        three_env_results['Panel_B_Basilicata'] = np.nan
    
    # Panel C: DDD (already estimated above)
    if outcome in ddd_results:
        ddd_result = ddd_results[outcome]
        if 'treat_private' in ddd_result.params.index:
            panel_c_coef = ddd_result.params['treat_private']
            three_env_results['Panel_C_DDD'] = panel_c_coef
            print(f"Panel C (DDD): {panel_c_coef:.4f}")
    
    # Create summary table
    three_env_table = pd.DataFrame({
        'Panel': ['A: Within Molise', 'B: Within Basilicata', 'C: DDD'],
        'Coefficient': [
            three_env_results.get('Panel_A_Molise', np.nan),
            three_env_results.get('Panel_B_Basilicata', np.nan),
            three_env_results.get('Panel_C_DDD', np.nan)
        ]
    })
    
    three_env_table.to_csv(TABLES_DIR / "table_three_env.csv", index=False)
    print(f"\nThree environments table saved to {TABLES_DIR / 'table_three_env.csv'}")
    
except Exception as e:
    print(f"Error in three environments analysis: {e}")
    import traceback
    traceback.print_exc()


In [None]:
print("=" * 80)
print("ROBUSTNESS CHECKS")
print("=" * 80)

robustness_results = {}
baseline_coef = baseline_results.get('earnings_asinh', None)
baseline_beta = baseline_coef.params['treat'] if baseline_coef else None

# 1. Exclude border municipalities (if flagged)
if 'is_border_municipality' in panel.columns:
    panel_no_border = panel[~panel['is_border_municipality']].copy()
    try:
        result = did(panel_no_border, outcome='earnings_asinh')
        robustness_results['no_border'] = {
            'coef': result.params['treat'],
            'se': result.std_errors['treat'],
            'delta': result.params['treat'] - baseline_beta if baseline_beta else np.nan
        }
        print(f"No border municipalities: β = {result.params['treat']:.4f}")
    except:
        pass

# 2. Balanced panel only
panel_balanced = panel[panel['is_balanced_97_07']].copy()
try:
    result = did(panel_balanced, outcome='earnings_asinh')
    robustness_results['balanced'] = {
        'coef': result.params['treat'],
        'se': result.std_errors['treat'],
        'delta': result.params['treat'] - baseline_beta if baseline_beta else np.nan
    }
    print(f"Balanced panel: β = {result.params['treat']:.4f}")
except Exception as e:
    print(f"Balanced panel failed: {e}")

# 3. Alternative post window: 2003-2005
panel_short_post = panel[(panel['year'] < 2002) | (panel['year'].between(2003, 2005))].copy()
panel_short_post['post'] = (panel_short_post['year'] >= 2003).astype(int)
panel_short_post['treat'] = panel_short_post['molise_res'] * panel_short_post['post']
try:
    result = did(panel_short_post, outcome='earnings_asinh')
    robustness_results['post_2003_2005'] = {
        'coef': result.params['treat'],
        'se': result.std_errors['treat'],
        'delta': result.params['treat'] - baseline_beta if baseline_beta else np.nan
    }
    print(f"Post window 2003-2005: β = {result.params['treat']:.4f}")
except Exception as e:
    print(f"Short post window failed: {e}")

# 4. Include 2002 with shock indicator
panel_with_2002 = panel.copy()
panel_with_2002 = df_raw[(df_raw['year'].between(1997, 2007)) & 
                         (df_raw['region_res'].isin(['12', '2']))].copy()
# Re-assemble with 2002
from build import construct_worker_type, make_outcomes, make_flags
panel_with_2002 = panel_with_2002[panel_with_2002['type'].isin([1, 2, 3])].copy()
panel_with_2002['worker_type'] = construct_worker_type(panel_with_2002)
panel_with_2002['molise_res'] = (panel_with_2002['region_res'] == '12').astype(int)
panel_with_2002['post'] = (panel_with_2002['year'] >= 2003).astype(int)
panel_with_2002['shock2002'] = (panel_with_2002['year'] == 2002).astype(int)
panel_with_2002['treat'] = panel_with_2002['molise_res'] * panel_with_2002['post']
outcomes_2002 = make_outcomes(panel_with_2002)
for col in outcomes_2002.columns:
    panel_with_2002[col] = outcomes_2002[col]

try:
    # Estimate with shock2002 indicator
    data_2002 = panel_with_2002.set_index(['id_worker', 'year'])
    y_2002 = data_2002['earnings_asinh']
    X_2002 = data_2002[['treat', 'shock2002']].copy()
    X_2002 = sm.add_constant(X_2002)
    mod_2002 = PanelOLS(y_2002, X_2002, entity_effects=True, time_effects=True)
    result_2002 = mod_2002.fit(cov_type='clustered', cluster_entity=True)
    robustness_results['with_2002'] = {
        'coef': result_2002.params['treat'],
        'se': result_2002.std_errors['treat'],
        'delta': result_2002.params['treat'] - baseline_beta if baseline_beta else np.nan
    }
    print(f"With 2002 indicator: β = {result_2002.params['treat']:.4f}")
except Exception as e:
    print(f"With 2002 failed: {e}")

# Create robustness summary
robustness_summary = pd.DataFrame(robustness_results).T
robustness_summary.index.name = 'Specification'
robustness_summary.to_csv(TABLES_DIR / "robustness_index.csv")
print(f"\nRobustness summary saved to {TABLES_DIR / 'robustness_index.csv'}")


In [None]:
print("=" * 80)
print("HETEROGENEITY ANALYSIS")
print("=" * 80)

heterogeneity_results = {}
outcome = 'earnings_asinh'

# 1. By gender × worker type
print("\nBy Gender × Worker Type:")
for gender_val in [0, 1]:
    gender_label = 'Male' if gender_val == 0 else 'Female'
    for worker_type in ['public', 'private', 'self']:
        subset = panel[(panel['gender'] == gender_val) & (panel['worker_type'] == worker_type)].copy()
        if len(subset) > 100:  # Minimum sample size
            try:
                result = did(subset, outcome=outcome)
                coef = result.params['treat']
                heterogeneity_results[f'gender_{gender_label}_type_{worker_type}'] = coef
                print(f"  {gender_label} × {worker_type}: {coef:.4f}")
            except:
                pass

# 2. By age groups
print("\nBy Age Groups:")
if 'age' in panel.columns:
    panel['age_group'] = pd.cut(panel['age'], bins=[0, 30, 50, 100], labels=['<30', '30-49', '50+'])
    for age_group in ['<30', '30-49', '50+']:
        subset = panel[panel['age_group'] == age_group].copy()
        if len(subset) > 100:
            try:
                result = did(subset, outcome=outcome)
                coef = result.params['treat']
                heterogeneity_results[f'age_{age_group}'] = coef
                print(f"  {age_group}: {coef:.4f}")
            except:
                pass

# 3. By sector (construction vs other)
print("\nBy Sector:")
if 'sector_12cat' in panel.columns:
    # Construction = 2
    panel['is_construction'] = (panel['sector_12cat'] == 2).astype(int)
    for sector_label, sector_val in [('Construction', 1), ('Other', 0)]:
        subset = panel[panel['is_construction'] == sector_val].copy()
        if len(subset) > 100:
            try:
                result = did(subset, outcome=outcome)
                coef = result.params['treat']
                heterogeneity_results[f'sector_{sector_label}'] = coef
                print(f"  {sector_label}: {coef:.4f}")
            except:
                pass

# Save heterogeneity results
heterogeneity_df = pd.DataFrame({
    'Group': list(heterogeneity_results.keys()),
    'Coefficient': list(heterogeneity_results.values())
})
heterogeneity_df.to_csv(TABLES_DIR / "heterogeneity_grids" / "all_dimensions.csv", index=False)
print(f"\nHeterogeneity results saved")


## 3.8 Diagnostics


In [None]:
print("=" * 80)
print("DIAGNOSTICS")
print("=" * 80)

# 1. Parallel trends test (already done in event study, but summarize)
print("\nParallel Trends Tests:")
for outcome in ['emp_prob', 'earnings_asinh']:
    if f"{outcome}_pooled" in event_results:
        pretrend = pretrend_test(event_results[f"{outcome}_pooled"])
        print(f"  {outcome}: p-value = {pretrend.get('p_value', 'N/A'):.4f}")

# 2. Cell counts
print("\nCell Counts (Region × Type × Year):")
cell_counts_diag = cell_counts(panel, ['molise_res', 'worker_type', 'year'])
print(cell_counts_diag.head(20))
cell_counts_diag.to_csv(DIAGNOSTICS_DIR / "cell_counts.csv", index=False)

# 3. Missingness audit
print("\nMissingness Audit:")
missing_audit = pd.DataFrame({
    'Variable': panel.columns,
    'Missing_Count': [panel[col].isna().sum() for col in panel.columns],
    'Missing_Pct': [panel[col].isna().sum() / len(panel) * 100 for col in panel.columns]
})
missing_audit = missing_audit[missing_audit['Missing_Count'] > 0].sort_values('Missing_Count', ascending=False)
print(missing_audit.head(10))
missing_audit.to_csv(DIAGNOSTICS_DIR / "missingness_audit.csv", index=False)

# 4. Winsorization info
print("\nWinsorization:")
if 'earnings_asinh' in panel.columns:
    # Check how many were winsorized (would need to track this in build.py)
    print("  Earnings winsorized at p1-p99 (tracked in variable construction)")

print(f"\nDiagnostics saved to {DIAGNOSTICS_DIR}")


## 3.9 Report Generation


In [None]:
print("=" * 80)
print("GENERATING REPORT")
print("=" * 80)

# Copy all figures to report directory
import shutil
figures_report_dir = REPORT_DIR / "_figures"
figures_report_dir.mkdir(parents=True, exist_ok=True)

for fig_file in FIGURES_DIR.glob("*.png"):
    shutil.copy(fig_file, figures_report_dir / fig_file.name)
    print(f"Copied {fig_file.name}")

# Copy all tables to report directory
tables_report_dir = REPORT_DIR / "_tables"
tables_report_dir.mkdir(parents=True, exist_ok=True)

for table_file in TABLES_DIR.glob("*.csv"):
    shutil.copy(table_file, tables_report_dir / table_file.name)
    print(f"Copied {table_file.name}")

# Create README
readme_content = f"""# Molise 2002 Earthquake Labor-Market Analysis

## Model Formulas

### Baseline DiD
Y_{{i r t}} = α + β(molise_res × post) + γ_i + λ_t + ε_{{i r t}}

- FE: individual (γ_i) and year (λ_t)
- Clustered SE: person_id
- Outcomes: emp_prob, earnings_asinh, wage_asinh, contract_perm

### DDD by Worker Type
Y_{{i s r t}} = α + θ(molise_res × post × private) + φ(molise_res × post × self) + 
                all two-way interactions + γ_i + λ_t + μ_s + ε_{{i s r t}}

- FE: individual, year, worker_type
- θ: Private vs Public
- φ: Self vs Public

### Event Study
Y_{{i r t}} = α + Σ_{{k≠-1}} β_k [1{{event_time = k}} × molise_res] + γ_i + λ_t + ε_{{i r t}}

- Reference period: k = -1 (year 2001)
- Event time range: k ∈ [-5, +5], excluding 0 (2002)

## Variable Construction

- **worker_type**: Constructed from 'type' field (1=Private, 2=Public, 3=Self, 4=Non-employed)
- **molise_res**: 1 if region_res == 12 (Molise), 0 if region_res == 2 (Basilicata), fixed by pre-period
- **post**: 1 if year >= 2003
- **treat**: molise_res × post
- **event_time**: year - 2002
- **earnings_asinh**: asinh(earnings_annualized), winsorized at p1-p99
- **emp_prob**: 1 if employed (has wage or working_weeks > 0)

## Dataset Lineage

- Raw data: Data/Molise.dta
- Processed panel: data/derived/analysis_ready.parquet
- Analysis period: 1997-2001 (pre) and 2003-2007 (post), excluding 2002
- Regions: Molise (12) and Basilicata (2) if available

## Clustering

Standard errors clustered by person_id (entity-level clustering).

## Sample Definitions

- **Balanced panel**: Present in all years 1997-2007 (excluding 2002)
- **Stayers**: No inter-municipality moves (if municipality data available)
- **Border municipalities**: Flagged if on region border (requires additional data)

Generated: {pd.Timestamp.now()}
"""

with open(REPORT_DIR / "README.md", "w") as f:
    f.write(readme_content)
print(f"\nREADME saved to {REPORT_DIR / 'README.md'}")

# Create session info
import platform
import sys
session_info = f"""Session Information
==================

Python version: {sys.version}
Platform: {platform.platform()}
Date: {pd.Timestamp.now()}

Package versions:
- pandas: {pd.__version__}
- numpy: {np.__version__}
- linearmodels: {__import__('linearmodels').__version__ if hasattr(__import__('linearmodels'), '__version__') else 'N/A'}
- matplotlib: {plt.matplotlib.__version__}
"""

with open(REPORT_DIR / "session_info.txt", "w") as f:
    f.write(session_info)
print(f"Session info saved to {REPORT_DIR / 'session_info.txt'}")

print("\n" + "=" * 80)
print("ANALYSIS COMPLETE!")
print("=" * 80)
print(f"\nAll outputs saved to:")
print(f"  Tables: {TABLES_DIR}")
print(f"  Figures: {FIGURES_DIR}")
print(f"  Report: {REPORT_DIR}")
print(f"  Diagnostics: {DIAGNOSTICS_DIR}")
