In [1]:
import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize


In [2]:
class DIDEstimator:
    def __init__(self, Y, D, T, X, W):
        self.Y = Y
        self.D = D
        self.T = T
        self.X = X
        self.W = W
        self.epsilon = 1e-10  # Small value to avoid numerical instability

    def probit(self, W, params):
        return norm.cdf(np.dot(W, params))
    
    def log_likelihood(self, params):
        p = np.clip(self.probit(self.W, params), self.epsilon, 1 - self.epsilon)
        return -np.sum(self.D * np.log(p) + (1 - self.D) * np.log(1 - p))
    
    def estimate_selection(self):
        initial_params = np.zeros(self.W.shape[1])
        result = minimize(self.log_likelihood, initial_params, method='BFGS')
        return result.x
    
    def inverse_mills_ratio(self, W, params):
        z = np.dot(W, params)
        return norm.pdf(z) / (norm.cdf(z) + self.epsilon)
    
    def estimate_did(self):
        # First stage: Estimate selection equation
        selection_params = self.estimate_selection()
        
        # Compute Inverse Mills Ratio
        imr = self.inverse_mills_ratio(self.W, selection_params)
        
        # Augment X with IMR
        X_augmented = np.column_stack((self.X, imr))
        
        # Second stage: Estimate DiD model
        DT = self.D * self.T
        X_full = np.column_stack((np.ones_like(self.D), self.D, self.T, DT, X_augmented))
        
        # OLS estimation
        beta = np.linalg.inv(X_full.T @ X_full) @ X_full.T @ self.Y
        
        # Compute standard errors (simplified, ignoring the first stage estimation)
        residuals = self.Y - X_full @ beta
        sigma2 = np.mean(residuals**2)
        var_beta = sigma2 * np.linalg.inv(X_full.T @ X_full)
        se_beta = np.sqrt(np.diag(var_beta))
        
        return beta, se_beta

In [3]:

# Function to simulate data
def simulate_data(n_samples, n_covariates, treatment_effect, selection_strength, heterogeneity_strength):
    np.random.seed(42)
    
    # Covariates
    X = np.random.randn(n_samples, n_covariates)
    W = np.column_stack((np.ones(n_samples), X))
    
    # Treatment assignment (selection equation)
    selection_params = np.random.randn(W.shape[1]) * selection_strength
    p_treatment = norm.cdf(np.dot(W, selection_params))
    D = (np.random.rand(n_samples) < p_treatment).astype(int)
    
    # Time periods
    T = np.random.binomial(1, 0.5, n_samples)
    
    # Outcome with heterogeneous treatment effects
    epsilon = np.random.randn(n_samples)
    heterogeneity = np.random.randn(n_covariates) * heterogeneity_strength
    Y = (1 + 0.5 * D + 0.5 * T + 
         treatment_effect * D * T + 
         np.dot(X, np.random.randn(n_covariates)) + 
         np.sum(X * heterogeneity * D[:, np.newaxis] * T[:, np.newaxis], axis=1) + 
         epsilon)
    
    return Y, D, T, X, W


In [4]:

# Simulate data
n_samples = 500
n_covariates = 3
true_treatment_effect = 2.0
selection_strength = 0.5
heterogeneity_strength = 2.0    

Y, D, T, X, W = simulate_data(n_samples, n_covariates, true_treatment_effect, selection_strength, heterogeneity_strength)


In [5]:

# Estimate DiD
estimator = DIDEstimator(Y, D, T, X, W)
beta, se_beta = estimator.estimate_did()


In [6]:

# Print results
param_names = ['Intercept', 'D', 'T', 'D*T (Treatment Effect)'] + [f'X{i+1}' for i in range(n_covariates)] + ['IMR']
for name, b, se in zip(param_names, beta, se_beta):
    print(f"{name}: {b:.4f} (SE: {se:.4f})")

print(f"\nTrue Treatment Effect: {true_treatment_effect}")
print(f"Estimated Treatment Effect: {beta[3]:.4f} (SE: {se_beta[3]:.4f})")

Intercept: 1.0693 (SE: 0.7716)
D: 0.2693 (SE: 0.1431)
T: 0.5146 (SE: 0.1574)
D*T (Treatment Effect): 1.8266 (SE: 0.1991)
X1: 2.4580 (SE: 0.2212)
X2: 0.4471 (SE: 0.2712)
X3: -0.9393 (SE: 0.0623)
IMR: 0.2228 (SE: 1.2809)

True Treatment Effect: 2.0
Estimated Treatment Effect: 1.8266 (SE: 0.1991)


In [7]:

def estimate_traditional_did(Y, D, T):
    """
    Estimate a traditional Difference-in-Differences model (no selection correction).
    
    Args:
    - Y: Outcome variable
    - D: Treatment group indicator (1 if treated, 0 otherwise)
    - T: Time period indicator (1 if post-treatment, 0 otherwise)
    
    Returns:
    - beta: Estimated coefficients
    - se_beta: Standard errors of the coefficients
    """
    DT = D * T
    X_full = np.column_stack((np.ones_like(D), D, T, DT))  # Intercept, D, T, D*T (Treatment effect)
    
    # OLS estimation
    beta = np.linalg.inv(X_full.T @ X_full) @ X_full.T @ Y
    
    # Compute standard errors
    residuals = Y - X_full @ beta
    sigma2 = np.mean(residuals**2)
    var_beta = sigma2 * np.linalg.inv(X_full.T @ X_full)
    se_beta = np.sqrt(np.diag(var_beta))
    
    return beta, se_beta


In [8]:

# Estimate traditional DiD
beta_trad, se_beta_trad = estimate_traditional_did(Y, D, T)

# Print traditional DiD results
param_names_trad = ['Intercept', 'D', 'T', 'D*T (Treatment Effect)']
print("\nTraditional DiD Estimates:")
for name, b, se in zip(param_names_trad, beta_trad, se_beta_trad):
    print(f"{name}: {b:.4f} (SE: {se:.4f})")

print(f"\nTraditional Estimated Treatment Effect: {beta_trad[3]:.4f} (SE: {se_beta_trad[3]:.4f})")



Traditional DiD Estimates:
Intercept: 2.0541 (SE: 0.2714)
D: -0.9854 (SE: 0.3483)
T: 0.6279 (SE: 0.3956)
D*T (Treatment Effect): 2.1199 (SE: 0.4997)

Traditional Estimated Treatment Effect: 2.1199 (SE: 0.4997)


In [9]:
class DIDWithSelectionAndHeterogeneity:
    def __init__(self, Y, D, T, X, W):
        self.Y = Y
        self.D = D
        self.T = T
        self.X = X
        self.W = W
        self.epsilon = 1e-10  # Small value to avoid numerical instability

    def probit(self, W, params):
        return norm.cdf(np.dot(W, params))
    
    def log_likelihood(self, params):
        p = np.clip(self.probit(self.W, params), self.epsilon, 1 - self.epsilon)
        return -np.sum(self.D * np.log(p) + (1 - self.D) * np.log(1 - p))
    
    def estimate_selection(self):
        initial_params = np.zeros(self.W.shape[1])
        result = minimize(self.log_likelihood, initial_params, method='BFGS')
        return result.x
    
    def inverse_mills_ratio(self, W, params):
        z = np.dot(W, params)
        return norm.pdf(z) / (norm.cdf(z) + self.epsilon)
    
    def estimate_did_with_selection_and_heterogeneity(self):
        # First stage: Estimate selection equation
        selection_params = self.estimate_selection()
        
        # Compute Inverse Mills Ratio
        imr = self.inverse_mills_ratio(self.W, selection_params)
        
        # Augment X with IMR
        X_augmented = np.column_stack((self.X, imr))
        
        # Second stage: Estimate DiD model with heterogeneity
        DT = self.D * self.T
        X_interactions = self.X * DT[:, np.newaxis]  # Interaction terms for heterogeneity
        X_full = np.column_stack((np.ones_like(self.D), self.D, self.T, DT, X_augmented, X_interactions))
        
        # OLS estimation
        beta = np.linalg.inv(X_full.T @ X_full) @ X_full.T @ self.Y
        
        # Compute standard errors (using robust standard errors)
        residuals = self.Y - X_full @ beta
        residuals = residuals.reshape(-1, 1)  # Ensure residuals is a column vector
        meat = X_full.T @ (residuals * X_full)
        bread = np.linalg.inv(X_full.T @ X_full)
        var_beta = bread @ meat @ bread
        se_beta = np.sqrt(np.diag(var_beta))
        
        return beta, se_beta

    def calculate_ate(self, beta):
        # Calculate ATE considering heterogeneity
        n_covariates = self.X.shape[1]
        base_effect = beta[3]  # Coefficient for D*T
        heterogeneity_effects = beta[-(n_covariates):]  # Coefficients for X*D*T interactions
        ate = base_effect + np.mean(self.X, axis=0) @ heterogeneity_effects
        return ate

In [10]:
# Estimate DiD with selection and heterogeneity
estimator = DIDWithSelectionAndHeterogeneity(Y, D, T, X, W)
beta, se_beta = estimator.estimate_did_with_selection_and_heterogeneity()

# Calculate ATE
ate = estimator.calculate_ate(beta)

# Print results
param_names = ['Intercept', 'D', 'T', 'D*T (Base Treatment Effect)'] + \
                [f'X{i+1}' for i in range(n_covariates)] + ['IMR'] + \
                [f'X{i+1}*D*T (Heterogeneity)' for i in range(n_covariates)]

print("Parameter Estimates:")
for name, b, se in zip(param_names, beta, se_beta):
    print(f"{name}: {b:.4f} (SE: {se:.4f})")

print(f"\nTrue Base Treatment Effect: {true_treatment_effect}")
print(f"Estimated Base Treatment Effect: {beta[3]:.4f} (SE: {se_beta[3]:.4f})")
print(f"Estimated Average Treatment Effect (ATE): {ate:.4f}")

Parameter Estimates:
Intercept: 1.1285 (SE: 0.1389)
D: 0.2672 (SE: nan)
T: 0.5089 (SE: 0.0238)
D*T (Base Treatment Effect): 1.9459 (SE: 0.0142)
X1: 2.7156 (SE: 0.0622)
X2: 0.3130 (SE: 0.0546)
X3: -1.1207 (SE: 0.0210)
IMR: 0.1114 (SE: 0.2583)
X1*D*T (Heterogeneity): -0.8003 (SE: 0.0289)
X2*D*T (Heterogeneity): 0.5471 (SE: nan)
X3*D*T (Heterogeneity): 0.4798 (SE: nan)

True Base Treatment Effect: 2.0
Estimated Base Treatment Effect: 1.9459 (SE: 0.0142)
Estimated Average Treatment Effect (ATE): 1.8680


  se_beta = np.sqrt(np.diag(var_beta))
