Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Deferred items from PR reviews that were not addressed before merge.
|-------|----------|----|----------|
| Tutorial notebooks not executed in CI | `docs/tutorials/*.ipynb` | #159 | Low |
| R comparison tests spawn separate `Rscript` per test (slow CI) | `tests/test_methodology_twfe.py:294` | #139 | Low |
| CS R helpers hard-code `xformla = ~ 1`; no covariate-adjusted R benchmark for IRLS path | `tests/test_methodology_callaway.py` | #202 | Low |

---

Expand Down
244 changes: 217 additions & 27 deletions diff_diff/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _detect_rank_deficiency(

# Compute pivoted QR decomposition: X @ P = Q @ R
# P is a permutation matrix, represented as pivot indices
Q, R, pivot = qr(X, mode='economic', pivoting=True)
Q, R, pivot = qr(X, mode="economic", pivoting=True)

# Determine rank tolerance
# R's qr() uses tol = 1e-07 by default, which is sqrt(eps) ≈ 1.49e-08
Expand Down Expand Up @@ -169,8 +169,7 @@ def _format_dropped_columns(
return ""

if column_names is not None:
names = [column_names[i] if i < len(column_names) else f"column {i}"
for i in dropped_cols]
names = [column_names[i] if i < len(column_names) else f"column {i}" for i in dropped_cols]
if len(names) == 1:
return f"'{names[0]}'"
elif len(names) <= 5:
Expand Down Expand Up @@ -251,10 +250,12 @@ def _solve_ols_rust(
cluster_ids: Optional[np.ndarray] = None,
return_vcov: bool = True,
return_fitted: bool = False,
) -> Optional[Union[
Tuple[np.ndarray, np.ndarray, Optional[np.ndarray]],
Tuple[np.ndarray, np.ndarray, np.ndarray, Optional[np.ndarray]],
]]:
) -> Optional[
Union[
Tuple[np.ndarray, np.ndarray, Optional[np.ndarray]],
Tuple[np.ndarray, np.ndarray, np.ndarray, Optional[np.ndarray]],
]
]:
"""
Rust backend implementation of solve_ols for full-rank matrices.

Expand Down Expand Up @@ -447,8 +448,7 @@ def solve_ols(
raise ValueError(f"y must be 1-dimensional, got shape {y.shape}")
if X.shape[0] != y.shape[0]:
raise ValueError(
f"X and y must have same number of observations: "
f"{X.shape[0]} vs {y.shape[0]}"
f"X and y must have same number of observations: " f"{X.shape[0]} vs {y.shape[0]}"
)

n, k = X.shape
Expand Down Expand Up @@ -484,7 +484,8 @@ def solve_ols(
if skip_rank_check:
if HAS_RUST_BACKEND and _rust_solve_ols is not None:
result = _solve_ols_rust(
X, y,
X,
y,
cluster_ids=cluster_ids,
return_vcov=return_vcov,
return_fitted=return_fitted,
Expand All @@ -494,7 +495,8 @@ def solve_ols(
# Fall through to NumPy on numerical instability
# Fall through to Python without rank check (user guarantees full rank)
return _solve_ols_numpy(
X, y,
X,
y,
cluster_ids=cluster_ids,
return_vcov=return_vcov,
return_fitted=return_fitted,
Expand All @@ -521,7 +523,8 @@ def solve_ols(
# - No Rust → Python backend (works for all cases)
if HAS_RUST_BACKEND and _rust_solve_ols is not None and not is_rank_deficient:
result = _solve_ols_rust(
X, y,
X,
y,
cluster_ids=cluster_ids,
return_vcov=return_vcov,
return_fitted=return_fitted,
Expand All @@ -531,7 +534,8 @@ def solve_ols(
# signaled us to fall back to Python backend
if result is None:
return _solve_ols_numpy(
X, y,
X,
y,
cluster_ids=cluster_ids,
return_vcov=return_vcov,
return_fitted=return_fitted,
Expand All @@ -555,7 +559,8 @@ def solve_ols(
# and SVD disagreed about rank. Python's QR will re-detect and
# apply R-style NaN handling for dropped columns.
return _solve_ols_numpy(
X, y,
X,
y,
cluster_ids=cluster_ids,
return_vcov=return_vcov,
return_fitted=return_fitted,
Expand All @@ -569,7 +574,8 @@ def solve_ols(
# Use NumPy implementation for rank-deficient cases (R-style NA handling)
# or when Rust backend is not available
return _solve_ols_numpy(
X, y,
X,
y,
cluster_ids=cluster_ids,
return_vcov=return_vcov,
return_fitted=return_fitted,
Expand Down Expand Up @@ -834,9 +840,7 @@ def _compute_robust_vcov_numpy(
n_clusters = len(unique_clusters)

if n_clusters < 2:
raise ValueError(
f"Need at least 2 clusters for cluster-robust SEs, got {n_clusters}"
)
raise ValueError(f"Need at least 2 clusters for cluster-robust SEs, got {n_clusters}")

# Small-sample adjustment
adjustment = (n_clusters / (n_clusters - 1)) * ((n - 1) / (n - k))
Expand Down Expand Up @@ -871,6 +875,193 @@ def _compute_robust_vcov_numpy(
return vcov


# Empirical threshold: coefficients above this magnitude suggest near-separation
# in the logistic model (predicted probabilities collapse to 0/1).
_LOGIT_SEPARATION_COEF_THRESHOLD = 10
_LOGIT_SEPARATION_PROB_THRESHOLD = 1e-5


def solve_logit(
X: np.ndarray,
y: np.ndarray,
max_iter: int = 25,
tol: float = 1e-8,
check_separation: bool = True,
rank_deficient_action: str = "warn",
) -> Tuple[np.ndarray, np.ndarray]:
"""
Fit logistic regression via IRLS (Fisher scoring).

Matches R's ``glm(family=binomial)`` algorithm: iteratively reweighted
least squares with working weights ``mu*(1-mu)`` and working response
``eta + (y-mu)/(mu*(1-mu))``.

Parameters
----------
X : np.ndarray
Feature matrix (n_samples, n_features). Intercept added automatically.
y : np.ndarray
Binary outcome (0/1).
max_iter : int, default 25
Maximum IRLS iterations (R's ``glm`` default).
tol : float, default 1e-8
Convergence tolerance on coefficient change (R's ``glm`` default).
check_separation : bool, default True
Whether to check for near-separation and emit warnings.
rank_deficient_action : str, default "warn"
How to handle rank-deficient design matrices:
- "warn": Emit warning and drop columns (default)
- "error": Raise ValueError
- "silent": Drop columns silently

Returns
-------
beta : np.ndarray
Fitted coefficients (including intercept as element 0).
probs : np.ndarray
Predicted probabilities.
"""
n, p = X.shape
X_with_intercept = np.column_stack([np.ones(n), X])
k = p + 1 # number of parameters including intercept

# Validate rank_deficient_action
valid_actions = {"warn", "error", "silent"}
if rank_deficient_action not in valid_actions:
raise ValueError(
f"rank_deficient_action must be one of {valid_actions}, "
f"got '{rank_deficient_action}'"
)

# Check rank deficiency once before iterating
rank_info = _detect_rank_deficiency(X_with_intercept)
rank, dropped_cols, _ = rank_info
if len(dropped_cols) > 0:
col_desc = _format_dropped_columns(dropped_cols)
if rank_deficient_action == "error":
raise ValueError(
f"Rank-deficient design matrix in logistic regression: "
f"dropping {col_desc}. Propensity score estimates may be unreliable."
)
elif rank_deficient_action == "warn":
warnings.warn(
f"Rank-deficient design matrix in logistic regression: "
f"dropping {col_desc}. Propensity score estimates may be unreliable.",
UserWarning,
stacklevel=2,
)
kept_cols = np.array([i for i in range(k) if i not in dropped_cols])
X_solve = X_with_intercept[:, kept_cols]
else:
kept_cols = np.arange(k)
X_solve = X_with_intercept

# IRLS (Fisher scoring)
beta_solve = np.zeros(X_solve.shape[1])
converged = False

for iteration in range(max_iter):
eta = X_solve @ beta_solve
# Clip to prevent overflow in exp
eta = np.clip(eta, -500, 500)
mu = 1.0 / (1.0 + np.exp(-eta))
# Clip mu to prevent zero working weights
mu = np.clip(mu, 1e-10, 1 - 1e-10)

# Working weights and working response
w = mu * (1.0 - mu)
z = eta + (y - mu) / w

# Weighted least squares: solve (X'WX) beta = X'Wz
sqrt_w = np.sqrt(w)
Xw = X_solve * sqrt_w[:, None]
zw = z * sqrt_w
beta_new, _, _, _ = np.linalg.lstsq(Xw, zw, rcond=None)

# Check convergence
if np.max(np.abs(beta_new - beta_solve)) < tol:
beta_solve = beta_new
converged = True
break
beta_solve = beta_new

# Final predicted probabilities
eta_final = X_solve @ beta_solve
eta_final = np.clip(eta_final, -500, 500)
probs = 1.0 / (1.0 + np.exp(-eta_final))

# Warnings
if not converged:
warnings.warn(
f"Logistic regression did not converge in {max_iter} iterations. "
f"Propensity score estimates may be unreliable.",
UserWarning,
stacklevel=2,
)

if check_separation:
if np.max(np.abs(beta_solve)) > _LOGIT_SEPARATION_COEF_THRESHOLD:
warnings.warn(
"Large coefficients detected in propensity score model "
f"(max|beta| > {_LOGIT_SEPARATION_COEF_THRESHOLD}), "
"suggesting potential separation.",
UserWarning,
stacklevel=2,
)
n_extreme = int(
np.sum(
(probs < _LOGIT_SEPARATION_PROB_THRESHOLD)
| (probs > 1 - _LOGIT_SEPARATION_PROB_THRESHOLD)
)
)
if n_extreme > 0:
warnings.warn(
f"Near-separation detected in propensity score model: "
f"{n_extreme} of {n} observations have predicted probabilities "
f"within {_LOGIT_SEPARATION_PROB_THRESHOLD} of 0 or 1. ATT estimates may be sensitive to "
f"model specification.",
UserWarning,
stacklevel=2,
)

# Expand beta back to full size if columns were dropped
if len(dropped_cols) > 0:
beta_full = np.zeros(k)
beta_full[kept_cols] = beta_solve
else:
beta_full = beta_solve

return beta_full, probs


def _check_propensity_diagnostics(
pscore: np.ndarray,
trim_bound: float = 0.01,
) -> None:
"""
Warn if propensity scores are extreme.

Parameters
----------
pscore : np.ndarray
Predicted probabilities.
trim_bound : float, default 0.01
Trimming threshold.
"""
n_extreme = int(np.sum((pscore < trim_bound) | (pscore > 1 - trim_bound)))
if n_extreme > 0:
n_total = len(pscore)
pct = 100.0 * n_extreme / n_total
warnings.warn(
f"Propensity scores for {n_extreme} of {n_total} observations "
f"({pct:.1f}%) were outside [{trim_bound}, {1 - trim_bound}] "
f"and will be trimmed. This may indicate near-separation in "
f"the propensity score model.",
UserWarning,
stacklevel=2,
)


def compute_r_squared(
y: np.ndarray,
residuals: np.ndarray,
Expand Down Expand Up @@ -1149,7 +1340,8 @@ def fit(
if self.robust or effective_cluster_ids is not None:
# Use solve_ols with robust/cluster SEs
coefficients, residuals, fitted, vcov = solve_ols(
X, y,
X,
y,
cluster_ids=effective_cluster_ids,
return_fitted=True,
return_vcov=compute_vcov,
Expand All @@ -1158,7 +1350,8 @@ def fit(
else:
# Classical OLS - compute vcov separately
coefficients, residuals, fitted, _ = solve_ols(
X, y,
X,
y,
return_fitted=True,
return_vcov=False,
rank_deficient_action=self.rank_deficient_action,
Expand Down Expand Up @@ -1294,6 +1487,7 @@ def get_inference(
# Handle zero or negative SE (indicates perfect fit or numerical issues)
if se <= 0:
import warnings

warnings.warn(
f"Standard error is zero or negative (se={se}) for coefficient at index {index}. "
"This may indicate perfect multicollinearity or numerical issues.",
Expand All @@ -1319,6 +1513,7 @@ def get_inference(
# Warn if df is non-positive and fall back to normal distribution
if effective_df is not None and effective_df <= 0:
import warnings

warnings.warn(
f"Degrees of freedom is non-positive (df={effective_df}). "
"Using normal distribution instead of t-distribution for inference.",
Expand Down Expand Up @@ -1396,10 +1591,7 @@ def get_all_inference(
Inference results for each coefficient in order.
"""
self._check_fitted()
return [
self.get_inference(i, alpha=alpha, df=df)
for i in range(len(self.coefficients_))
]
return [self.get_inference(i, alpha=alpha, df=df) for i in range(len(self.coefficients_))]

def r_squared(self, adjusted: bool = False) -> float:
"""
Expand All @@ -1424,9 +1616,7 @@ def r_squared(self, adjusted: bool = False) -> float:
self._check_fitted()
# Use effective params for adjusted R² to match df correction
n_params = self.n_params_effective_ if adjusted else self.n_params_
return compute_r_squared(
self._y, self.residuals_, adjusted=adjusted, n_params=n_params
)
return compute_r_squared(self._y, self.residuals_, adjusted=adjusted, n_params=n_params)

def predict(self, X: np.ndarray) -> np.ndarray:
"""
Expand Down
Loading
Loading