In [1]:
import numpy as np
import pandas as pd
import cvxpy as cp
from typing import Tuple, Dict

### Utils

In [2]:
def panel_matrices(
    df: pd.DataFrame, unit_id: str, time_id: str, treat: str, outcome: str
) -> Dict:
    """
    Reshape panel data from long to wide format.

    Python translation of R's panelMatrices function.
    """
    # Pivot to wide format for treatment matrix
    W = df.pivot(index=unit_id, columns=time_id, values=treat).fillna(0).values

    # Pivot to wide format for outcome matrix
    Y = df.pivot(index=unit_id, columns=time_id, values=outcome).values

    # Move treated units to bottom of matrices (like R code)
    treat_ids = np.where(W.sum(axis=1) > 1)[0]
    N0 = W.shape[0] - len(treat_ids)
    T0 = np.where(W.sum(axis=0) > 0)[0][0]  # First treatment period - 1

    # Reorder: controls first, then treated
    control_ids = np.setdiff1d(range(W.shape[0]), treat_ids)
    unit_order = np.concatenate([control_ids, treat_ids])

    W_reordered = W[unit_order, :]
    Y_reordered = Y[unit_order, :]

    return {"W": W_reordered, "Y": Y_reordered, "N0": N0, "T0": T0}

In [3]:
def sigma_estimator(Y: np.ndarray, N0: int, T0: int) -> float:
    """
    Estimate noise level following the R code:
    σ = sd(apply(Y[1:N0, 1:T0], 1, diff))
    """
    Y_control_pre = Y[:N0, :T0]
    diffs = np.array([np.std(np.diff(Y_control_pre[i, :])) for i in range(N0)])
    return np.std(diffs)


def simplex_least_squares_cvxpy(
    A: np.ndarray, b: np.ndarray, zeta: float = 0.0, intercept: bool = False
) -> np.ndarray:
    """
    Solve simplex-constrained least squares using cvxpy.

    Direct translation of the R simplexLeastSquares function:
    - minimize ||Aw - b||^2 + ζ^2 * n * ||w||^2     if intercept=False
    - minimize ||Aw + w0 - b||^2 + ζ^2 * n * ||w||^2 if intercept=True
    subject to w >= 0, sum(w) = 1
    """
    n = A.shape[1]
    w = cp.Variable(n)
    constraints = [cp.sum(w) == 1, w >= 0]

    if intercept:
        w0 = cp.Variable(1)
        objective = cp.sum_squares(A @ w + w0 - b) + zeta**2 * len(b) * cp.sum_squares(
            w
        )
    else:
        objective = cp.sum_squares(A @ w - b) + zeta**2 * len(b) * cp.sum_squares(w)

    problem = cp.Problem(cp.Minimize(objective), constraints)
    problem.solve(solver=cp.CLARABEL, verbose=False)

    if problem.status != cp.OPTIMAL:
        print(f"Warning: Optimization status: {problem.status}")

    return w.value


In [4]:
def synthetic_control(
    Y: np.ndarray, N0: int, T0: int, zeta_omega: float = None
) -> float:
    """
    Traditional synthetic control (sC function from R).
    """
    N, T = Y.shape
    N1, T1 = N - N0, T - T0

    if zeta_omega is None:
        zeta_omega = 1e-6 * sigma_estimator(Y, N0, T0)

    # Solve for unit weights ω
    Y_control_pre = Y[:N0, :T0]  # Control units, pre-treatment
    Y_treated_pre_mean = Y[N0:, :T0].mean(axis=0)  # Treated units, pre-treatment mean

    omega = simplex_least_squares_cvxpy(
        Y_control_pre.T, Y_treated_pre_mean, zeta=zeta_omega, intercept=False
    )

    # Compute estimate using matrix formulation
    unit_weight_vec = np.concatenate([-omega, np.ones(N1) / N1])
    time_weight_vec = np.concatenate([np.zeros(T0), np.ones(T1) / T1])

    estimate = unit_weight_vec.T @ Y @ time_weight_vec
    return float(estimate)


def diff_in_diff(Y: np.ndarray, N0: int, T0: int) -> float:
    """
    Difference-in-differences (dID function from R).
    """
    N, T = Y.shape
    N1, T1 = N - N0, T - T0

    # Simple DiD: uniform weights for both units and time
    unit_weight_vec = np.concatenate([-np.ones(N0) / N0, np.ones(N1) / N1])
    time_weight_vec = np.concatenate([-np.ones(T0) / T0, np.ones(T1) / T1])

    estimate = unit_weight_vec.T @ Y @ time_weight_vec
    return float(estimate)


def synthetic_diff_in_diff(
    Y: np.ndarray, N0: int, T0: int, zeta_omega: float = None
) -> Tuple[float, np.ndarray, np.ndarray]:
    """
    Synthetic Difference-in-Differences (sDiD function from R).

    Returns estimate, unit weights (omega), and time weights (lambda).
    """
    N, T = Y.shape
    N1, T1 = N - N0, T - T0

    # Default regularization following R code
    if zeta_omega is None:
        sigma_est = sigma_estimator(Y, N0, T0)
        zeta_omega = ((N1 * T1) ** 0.25) * sigma_est

    epsilon = 1e-6  # Small regularization for time weights
    zeta_lambda = epsilon * sigma_estimator(Y, N0, T0)

    # Solve for time weights λ
    Y_control_pre = Y[:N0, :T0]  # Control units, pre-treatment
    Y_control_post_mean = Y[:N0, T0:].mean(axis=1)  # Control units, post-treatment mean

    lambda_weights = simplex_least_squares_cvxpy(
        Y_control_pre, Y_control_post_mean, zeta=zeta_lambda, intercept=True
    )

    # Solve for unit weights ω
    Y_treated_pre_mean = Y[N0:, :T0].mean(axis=0)  # Treated units, pre-treatment mean

    omega_weights = simplex_least_squares_cvxpy(
        Y_control_pre.T, Y_treated_pre_mean, zeta=zeta_omega, intercept=True
    )

    # Compute SDID estimate using matrix formulation
    unit_weight_vec = np.concatenate([-omega_weights, np.ones(N1) / N1])
    time_weight_vec = np.concatenate([-lambda_weights, np.ones(T1) / T1])

    estimate = unit_weight_vec.T @ Y @ time_weight_vec
    return float(estimate), omega_weights, lambda_weights


## sims

In [5]:
from synthlearners.simulator import (
    SimulationConfig,
    PanelSimulator,
    FactorDGP,
)

In [6]:
"""Test the functions with simulated data."""
np.random.seed(42)

config = SimulationConfig(
    N=100,
    T=50,
    T_pre=20,
    n_treated=10,
    selection_mean=3.0,
    treatment_effect=0.5,
    dgp=FactorDGP(
        K=2,
        sigma=1,
        time_fac_lb=-0.1,
        time_fac_ub=0.1,
        trend_sigma=0.2,
    ),
)
simulator = PanelSimulator(config)
Y, Y_0, L, treated_units = simulator.simulate()
# rearrange into all treated units at the bottom
Y = np.r_[
    Y[np.setdiff1d(range(Y.shape[0]), treated_units), :],
    Y[treated_units, :]
]

### Estimates

In [7]:
sc_est = synthetic_control(
    Y, config.N - config.n_treated, config.T_pre
)

did_est = diff_in_diff(
    Y, config.N - config.n_treated, config.T_pre
)
sdid_est, omega, lam = synthetic_diff_in_diff(
    Y, config.N - config.n_treated, config.T_pre
)

print(f"Synthetic Control estimate: {sc_est:.4f}")
print(f"Difference-in-Differences: {did_est:.4f}")
print(f"Synthetic DiD estimate: {sdid_est:.4f}")

print("SDID Weight summaries:")
print(
    f"Unit weights (ω) - sum: {omega.sum():.6f}, range: [{omega.min():.4f}, {omega.max():.4f}]"
)
print(
    f"Time weights (λ) - sum: {lam.sum():.6f}, range: [{lam.min():.4f}, {lam.max():.4f}]"
)
print(f"Active time periods: {np.sum(lam > 1e-4)}/{len(lam)}")
print(f"Active control units: {np.sum(omega > 1e-4)}/{len(omega)}")

Synthetic Control estimate: 0.3349
Difference-in-Differences: 2.1112
Synthetic DiD estimate: 0.5931
SDID Weight summaries:
Unit weights (ω) - sum: 1.000000, range: [0.0000, 0.0780]
Time weights (λ) - sum: 1.000000, range: [0.0000, 0.5614]
Active time periods: 2/20
Active control units: 52/90
