# 08_AdaptiveLasso - Physics-SR Framework v3.0

## Stage 2.2c: Adaptive Lasso with Oracle Property

**Author:** Zhengze Zhang  
**Affiliation:** Department of Statistics, Columbia University  
**Date:** January 2026

---

### Purpose

Achieve the oracle property for variable selection in symbolic regression. The oracle property guarantees:
1. **Selection consistency:** $P(\text{support} = \text{true support}) \to 1$ as $n \to \infty$
2. **Asymptotic normality:** $\sqrt{n}(\hat{\xi} - \xi^*) \xrightarrow{d} N(0, V)$

### Key Innovation

Standard LASSO has a limitation: it applies equal penalty to all coefficients, leading to:
- Shrinkage bias on large coefficients
- Inconsistent variable selection

Adaptive LASSO solves this by using **data-driven weights**:
$$w_j = \frac{1}{(|\hat{\beta}_j^{init}| + \varepsilon)^\gamma}$$

where $\gamma > 0$ and $\varepsilon$ is a stabilization constant.

### Mathematical Foundation

The adaptive LASSO objective:
$$\hat{\xi} = \arg\min_\xi \|y - \Phi\xi\|^2 + \lambda \sum_{j=1}^{p} w_j |\xi_j|$$

### Reference

- Zou, H. (2006). The adaptive lasso and its oracle properties. *JASA*, 101(476), 1418-1429.

---
## Section 1: Header and Imports

In [None]:
"""
08_AdaptiveLasso.ipynb - Adaptive Lasso with Oracle Property
=============================================================

Three-Stage Physics-Informed Symbolic Regression Framework v3.0

This module provides:
- AdaptiveLassoSelector: Adaptive Lasso with data-driven weights
- Oracle property for variable selection consistency
- Epsilon-stabilization to prevent weight explosion
- Cross-validation for lambda selection

Algorithm:
    1. Compute initial estimate via Ridge regression
    2. Compute adaptive weights: w_j = 1 / (|beta_init[j]| + eps)^gamma
    3. Fit weighted Lasso with CV for lambda selection
    4. Transform coefficients back to original scale

Author: Zhengze Zhang
Affiliation: Department of Statistics, Columbia University
"""

# Import core module
%run 00_Core.ipynb

In [None]:
# Additional imports for Adaptive Lasso
from sklearn.linear_model import Ridge, LassoCV, Lasso
from sklearn.preprocessing import StandardScaler
from typing import Dict, List, Tuple, Optional, Any

print("08_AdaptiveLasso: Additional imports successful.")

---
## Section 2: Class Definition

In [None]:
# ==============================================================================
# ADAPTIVE LASSO SELECTOR CLASS
# ==============================================================================

class AdaptiveLassoSelector:
    """
    Adaptive Lasso with Oracle Property for Symbolic Regression.
    
    Implements adaptive LASSO which achieves the oracle property:
    - Selection consistency (recovers true support)
    - Asymptotic normality (valid inference)
    
    The key innovation is using data-driven weights that penalize
    small coefficients more heavily, allowing large coefficients
    to remain unbiased.
    
    Attributes
    ----------
    gamma : float
        Weight exponent (default: 1.0). Higher gamma = stronger penalty
        on small initial coefficients.
    eps : float
        Stabilization constant to prevent weight explosion (default: 1e-6)
    cv_folds : int
        Number of folds for cross-validation (default: 5)
    initial_method : str
        Method for initial estimate: 'ridge' or 'ols' (default: 'ridge')
    ridge_alpha : float
        Regularization for initial Ridge estimate (default: 0.1)
    
    Examples
    --------
    >>> selector = AdaptiveLassoSelector(gamma=1.0)
    >>> result = selector.fit(Phi, y, feature_names=names)
    >>> print(result['equation'])
    """
    
    def __init__(
        self,
        gamma: float = 1.0,
        eps: float = 1e-6,
        cv_folds: int = DEFAULT_CV_FOLDS,
        initial_method: str = 'ridge',
        ridge_alpha: float = 0.1
    ):
        """
        Initialize AdaptiveLassoSelector.
        
        Parameters
        ----------
        gamma : float
            Exponent for adaptive weights. gamma=1 is standard,
            gamma=2 provides stronger adaptation.
            Default: 1.0
        eps : float
            Stabilization constant to prevent division by zero.
            Default: 1e-6
        cv_folds : int
            Number of cross-validation folds for lambda selection.
            Default: 5
        initial_method : str
            'ridge' or 'ols' for initial estimate.
            Default: 'ridge' (more stable)
        ridge_alpha : float
            Ridge regularization parameter for initial estimate.
            Default: 0.1
        """
        self.gamma = gamma
        self.eps = eps
        self.cv_folds = cv_folds
        self.initial_method = initial_method
        self.ridge_alpha = ridge_alpha
        
        # Internal state
        self._coefficients = None
        self._support = None
        self._feature_names = None
        self._n_features = None
        self._initial_estimate = None
        self._adaptive_weights = None
        self._optimal_lambda = None
        self._fit_complete = False
        self._r2_score = None
        self._mse = None
    
    def fit(
        self,
        feature_library: np.ndarray,
        y: np.ndarray,
        feature_names: List[str] = None
    ) -> Dict[str, Any]:
        """
        Fit Adaptive Lasso model.
        
        Parameters
        ----------
        feature_library : np.ndarray
            Feature matrix of shape (n_samples, n_features)
        y : np.ndarray
            Target vector of shape (n_samples,)
        feature_names : List[str], optional
            Names of features
        
        Returns
        -------
        Dict[str, Any]
            Dictionary containing:
            - coefficients: Coefficient vector
            - support: Boolean mask of active terms
            - equation: String representation
            - n_active_terms: Number of non-zero coefficients
            - optimal_lambda: Selected lambda via CV
            - r2_score: R-squared on training data
            - mse: Mean squared error
        """
        n_samples, n_features = feature_library.shape
        self._n_features = n_features
        
        # Set feature names
        if feature_names is None:
            self._feature_names = [f'f{i}' for i in range(n_features)]
        else:
            self._feature_names = list(feature_names)
        
        # Step 1: Compute initial estimate
        self._initial_estimate = self._compute_initial_estimate(
            feature_library, y
        )
        
        # Step 2: Compute adaptive weights
        self._adaptive_weights = self._compute_adaptive_weights(
            self._initial_estimate
        )
        
        # Step 3: Fit weighted Lasso
        self._coefficients, self._optimal_lambda = self._fit_weighted_lasso(
            feature_library, y, self._adaptive_weights
        )
        
        # Determine support
        self._support = np.abs(self._coefficients) > 1e-10
        
        # Compute metrics
        y_pred = feature_library @ self._coefficients
        self._mse = np.mean((y - y_pred)**2)
        ss_tot = np.sum((y - np.mean(y))**2)
        ss_res = np.sum((y - y_pred)**2)
        self._r2_score = 1 - ss_res / ss_tot if ss_tot > 0 else 0.0
        
        self._fit_complete = True
        
        return {
            'coefficients': self._coefficients,
            'support': self._support,
            'equation': self.get_equation(),
            'n_active_terms': int(np.sum(self._support)),
            'optimal_lambda': self._optimal_lambda,
            'r2_score': self._r2_score,
            'mse': self._mse,
            'initial_estimate': self._initial_estimate,
            'adaptive_weights': self._adaptive_weights,
            'gamma': self.gamma,
            'eps': self.eps
        }
    
    def _compute_initial_estimate(
        self,
        Phi: np.ndarray,
        y: np.ndarray
    ) -> np.ndarray:
        """
        Compute initial coefficient estimate.
        
        Uses Ridge regression for stability (always has a solution
        even when p > n or features are collinear).
        
        Parameters
        ----------
        Phi : np.ndarray
            Feature matrix
        y : np.ndarray
            Target vector
        
        Returns
        -------
        np.ndarray
            Initial coefficient estimate
        """
        if self.initial_method == 'ridge':
            ridge = Ridge(alpha=self.ridge_alpha, fit_intercept=False)
            ridge.fit(Phi, y)
            return ridge.coef_
        else:  # OLS
            try:
                beta, _, _, _ = np.linalg.lstsq(Phi, y, rcond=None)
                return beta
            except np.linalg.LinAlgError:
                # Fallback to Ridge if OLS fails
                ridge = Ridge(alpha=0.1, fit_intercept=False)
                ridge.fit(Phi, y)
                return ridge.coef_
    
    def _compute_adaptive_weights(
        self,
        beta_init: np.ndarray
    ) -> np.ndarray:
        """
        Compute adaptive weights with epsilon stabilization.
        
        w_j = 1 / (|beta_init[j]| + eps)^gamma
        
        The epsilon prevents weight explosion when beta_init â‰ˆ 0.
        
        Parameters
        ----------
        beta_init : np.ndarray
            Initial coefficient estimate
        
        Returns
        -------
        np.ndarray
            Adaptive weights
        """
        return 1.0 / (np.abs(beta_init) + self.eps) ** self.gamma
    
    def _fit_weighted_lasso(
        self,
        Phi: np.ndarray,
        y: np.ndarray,
        weights: np.ndarray
    ) -> Tuple[np.ndarray, float]:
        """
        Fit weighted Lasso via variable transformation.
        
        Transform: Phi_weighted = Phi / sqrt(weights)
        This makes the penalty: lambda * sum(w_j * |beta_j|)
        equivalent to: lambda * sum(|beta_weighted_j|)
        
        Parameters
        ----------
        Phi : np.ndarray
            Feature matrix
        y : np.ndarray
            Target vector
        weights : np.ndarray
            Adaptive weights
        
        Returns
        -------
        Tuple[np.ndarray, float]
            (coefficients, optimal_lambda)
        """
        # Transform design matrix
        # Note: We divide by sqrt(weights) so that after Lasso
        # we can recover original coefficients
        sqrt_weights = np.sqrt(weights)
        Phi_weighted = Phi / sqrt_weights
        
        # Fit Lasso with CV
        lasso_cv = LassoCV(
            cv=self.cv_folds,
            fit_intercept=False,
            max_iter=10000,
            tol=1e-6
        )
        lasso_cv.fit(Phi_weighted, y)
        
        # Transform coefficients back
        beta_weighted = lasso_cv.coef_
        beta = self._transform_back(beta_weighted, weights)
        
        return beta, lasso_cv.alpha_
    
    def _transform_back(
        self,
        beta_weighted: np.ndarray,
        weights: np.ndarray
    ) -> np.ndarray:
        """
        Transform weighted coefficients back to original scale.
        
        Parameters
        ----------
        beta_weighted : np.ndarray
            Coefficients from weighted Lasso
        weights : np.ndarray
            Adaptive weights
        
        Returns
        -------
        np.ndarray
            Original-scale coefficients
        """
        sqrt_weights = np.sqrt(weights)
        return beta_weighted / sqrt_weights
    
    def get_equation(
        self,
        feature_names: List[str] = None
    ) -> str:
        """
        Get string representation of discovered equation.
        
        Parameters
        ----------
        feature_names : List[str], optional
            Feature names to use
        
        Returns
        -------
        str
            Equation string
        """
        if self._coefficients is None:
            return ""
        
        names = feature_names or self._feature_names
        
        terms = []
        for i, (coef, name) in enumerate(zip(self._coefficients, names)):
            if abs(coef) > 1e-10:
                if coef >= 0 and len(terms) > 0:
                    terms.append(f"+ {coef:.6f}*{name}")
                else:
                    terms.append(f"{coef:.6f}*{name}")
        
        if len(terms) == 0:
            return "0"
        
        return " ".join(terms)
    
    def predict(
        self,
        Phi_new: np.ndarray
    ) -> np.ndarray:
        """
        Predict using fitted model.
        
        Parameters
        ----------
        Phi_new : np.ndarray
            Feature matrix for new data
        
        Returns
        -------
        np.ndarray
            Predictions
        """
        if self._coefficients is None:
            raise ValueError("Must call fit() before predict()")
        return Phi_new @ self._coefficients
    
    def get_active_terms(
        self
    ) -> List[Tuple[str, float]]:
        """
        Get list of active terms with coefficients.
        
        Returns
        -------
        List[Tuple[str, float]]
            List of (name, coefficient) tuples
        """
        if not self._fit_complete:
            raise ValueError("Must call fit() first")
        
        active = []
        for i, (coef, name) in enumerate(zip(self._coefficients, self._feature_names)):
            if self._support[i]:
                active.append((name, float(coef)))
        
        return active
    
    def print_alasso_report(self) -> None:
        """
        Print detailed Adaptive Lasso report.
        """
        if not self._fit_complete:
            print("Fit not yet performed. Call fit() first.")
            return
        
        print("=" * 70)
        print(" Adaptive Lasso Results")
        print("=" * 70)
        print()
        print(f"Configuration:")
        print(f"  Gamma: {self.gamma}")
        print(f"  Epsilon: {self.eps}")
        print(f"  CV folds: {self.cv_folds}")
        print(f"  Initial method: {self.initial_method}")
        print()
        print(f"Results:")
        print(f"  Optimal lambda: {self._optimal_lambda:.6e}")
        print(f"  Active terms: {int(np.sum(self._support))} / {self._n_features}")
        print(f"  R-squared: {self._r2_score:.6f}")
        print(f"  MSE: {self._mse:.6e}")
        print()
        print("-" * 70)
        print(" Discovered Equation:")
        print("-" * 70)
        print(f"  {self.get_equation()}")
        print()
        print("-" * 70)
        print(" Coefficient Details:")
        print("-" * 70)
        print(f"  {'Term':<25} {'Init Est':<12} {'Weight':<12} {'Final':<12}")
        print("  " + "-" * 60)
        
        for i in range(self._n_features):
            if self._support[i]:
                print(f"  {self._feature_names[i]:<25} "
                      f"{self._initial_estimate[i]:<12.4f} "
                      f"{self._adaptive_weights[i]:<12.4f} "
                      f"{self._coefficients[i]:<12.6f}")
        
        print()
        print("=" * 70)

---
## Section 3: Internal Tests

In [None]:
# ==============================================================================
# TEST CONTROL FLAG
# ==============================================================================

_RUN_TESTS = False  # Set to True to run internal tests

if _RUN_TESTS:
    print("=" * 70)
    print(" RUNNING INTERNAL TESTS FOR 08_AdaptiveLasso")
    print("=" * 70)

In [None]:
# ==============================================================================
# TEST 1: Oracle Property - Selection Consistency
# ==============================================================================

if _RUN_TESTS:
    print()
    print_section_header("Test 1: Oracle Property - Selection Consistency")
    
    # Generate data with known sparse solution
    np.random.seed(42)
    n_samples = 300
    n_features = 20
    n_active = 3
    
    # True coefficients: only first 3 are non-zero
    true_coef = np.zeros(n_features)
    true_coef[0] = 2.0
    true_coef[1] = -1.5
    true_coef[2] = 0.8
    
    # Generate features
    X = np.random.randn(n_samples, n_features)
    y = X @ true_coef + 0.1 * np.random.randn(n_samples)
    
    feature_names = [f'x{i}' for i in range(n_features)]
    
    print(f"True active features: x0, x1, x2")
    print(f"True coefficients: {true_coef[:3]}")
    print()
    
    # Fit Adaptive Lasso
    selector = AdaptiveLassoSelector(gamma=1.0)
    result = selector.fit(X, y, feature_names=feature_names)
    
    print(f"Discovered active: {result['n_active_terms']} features")
    print(f"Active terms: {[t[0] for t in selector.get_active_terms()]}")
    
    # Check if true support is recovered
    recovered_support = set(t[0] for t in selector.get_active_terms())
    true_support = {'x0', 'x1', 'x2'}
    
    if recovered_support == true_support:
        print("[PASS] Oracle property: True support recovered exactly")
    elif true_support.issubset(recovered_support):
        print("[PARTIAL] True support included, but extra features selected")
    else:
        print(f"[INFO] Support: {recovered_support}")

In [None]:
# ==============================================================================
# TEST 2: Epsilon Stabilization
# ==============================================================================

if _RUN_TESTS:
    print()
    print_section_header("Test 2: Epsilon Stabilization")
    
    np.random.seed(42)
    n_samples = 200
    
    # Feature with very small initial coefficient
    x1 = np.random.randn(n_samples)
    x2 = np.random.randn(n_samples)
    x3 = np.random.randn(n_samples)  # Irrelevant feature
    
    y = 2*x1 + 0.5*x2 + 0.01*np.random.randn(n_samples)
    
    X = np.column_stack([x1, x2, x3])
    feature_names = ['x1', 'x2', 'x3']
    
    # Test different epsilon values
    eps_values = [1e-10, 1e-6, 1e-3]
    
    print(f"Testing different epsilon values:")
    print(f"{'Epsilon':<12} {'Active':<10} {'R2':<10} {'Max Weight':<15}")
    print("-" * 50)
    
    for eps in eps_values:
        selector = AdaptiveLassoSelector(gamma=1.0, eps=eps)
        result = selector.fit(X, y, feature_names=feature_names)
        max_weight = np.max(selector._adaptive_weights)
        print(f"{eps:<12.0e} {result['n_active_terms']:<10} "
              f"{result['r2_score']:<10.4f} {max_weight:<15.2e}")
    
    print()
    print("Note: Smaller eps = larger max weight (potential instability)")

In [None]:
# ==============================================================================
# TEST 3: Gamma Sensitivity
# ==============================================================================

if _RUN_TESTS:
    print()
    print_section_header("Test 3: Gamma Sensitivity")
    
    np.random.seed(42)
    n_samples = 200
    
    x1 = np.random.randn(n_samples)
    x2 = np.random.randn(n_samples)
    x3 = np.random.randn(n_samples)
    
    # Sparse signal
    y = 3*x1 + 0.01*np.random.randn(n_samples)
    
    X = np.column_stack([x1, x2, x3])
    feature_names = ['x1', 'x2', 'x3']
    
    gamma_values = [0.5, 1.0, 2.0]
    
    print(f"True: y = 3*x1 (only x1 is active)")
    print()
    print(f"{'Gamma':<10} {'Active':<10} {'R2':<10} {'x1 coef':<12}")
    print("-" * 45)
    
    for gamma in gamma_values:
        selector = AdaptiveLassoSelector(gamma=gamma)
        result = selector.fit(X, y, feature_names=feature_names)
        x1_coef = result['coefficients'][0]
        print(f"{gamma:<10.1f} {result['n_active_terms']:<10} "
              f"{result['r2_score']:<10.4f} {x1_coef:<12.4f}")
    
    print()
    print("Note: Higher gamma = stronger penalty on small initial coefficients")

In [None]:
# ==============================================================================
# TEST 4: Comparison with Standard Lasso
# ==============================================================================

if _RUN_TESTS:
    print()
    print_section_header("Test 4: Adaptive Lasso vs Standard Lasso")
    
    np.random.seed(42)
    n_samples = 300
    
    x1 = np.random.randn(n_samples)
    x2 = np.random.randn(n_samples)
    x3 = np.random.randn(n_samples)
    
    # True: y = 5*x1 + 2*x2 (large difference in coefficients)
    y = 5*x1 + 2*x2 + 0.1*np.random.randn(n_samples)
    
    X = np.column_stack([x1, x2, x3])
    feature_names = ['x1', 'x2', 'x3']
    
    print(f"True: y = 5*x1 + 2*x2")
    print()
    
    # Standard Lasso
    lasso = LassoCV(cv=5, fit_intercept=False)
    lasso.fit(X, y)
    
    # Adaptive Lasso
    alasso = AdaptiveLassoSelector(gamma=1.0)
    alasso_result = alasso.fit(X, y, feature_names=feature_names)
    
    print(f"{'Method':<20} {'x1':<12} {'x2':<12} {'x3':<12}")
    print("-" * 55)
    print(f"{'True':<20} {5.0:<12.4f} {2.0:<12.4f} {0.0:<12.4f}")
    print(f"{'Standard Lasso':<20} {lasso.coef_[0]:<12.4f} "
          f"{lasso.coef_[1]:<12.4f} {lasso.coef_[2]:<12.4f}")
    print(f"{'Adaptive Lasso':<20} {alasso._coefficients[0]:<12.4f} "
          f"{alasso._coefficients[1]:<12.4f} {alasso._coefficients[2]:<12.4f}")
    
    print()
    print("Note: Adaptive Lasso should have less bias on large coefficients")

---
## Section 4: Module Summary

In [None]:
# ==============================================================================
# MODULE SUMMARY
# ==============================================================================

print("=" * 70)
print(" 08_AdaptiveLasso.ipynb - Module Summary")
print("=" * 70)
print()
print("CLASS: AdaptiveLassoSelector")
print("-" * 70)
print()
print("Purpose:")
print("  Adaptive Lasso with oracle property for variable selection.")
print("  Achieves selection consistency and asymptotic normality.")
print()
print("Main Methods:")
print("  fit(feature_library, y, feature_names=None)")
print("      Fit Adaptive Lasso model")
print("      Returns: dict with coefficients, support, equation, metrics")
print()
print("  get_equation()")
print("      Get string representation of equation")
print()
print("  predict(Phi_new)")
print("      Make predictions")
print()
print("  get_active_terms()")
print("      Get list of active terms with coefficients")
print()
print("  print_alasso_report()")
print("      Print detailed results report")
print()
print("Key Parameters:")
print("  gamma: Weight exponent (1.0 = standard, 2.0 = stronger)")
print("  eps: Stabilization constant (default: 1e-6)")
print()
print("Usage Example:")
print("-" * 70)
print("""
# Build feature library
builder = FeatureLibraryBuilder(max_poly_degree=3)
Phi, names = builder.build(X, feature_names)

# Fit Adaptive Lasso
selector = AdaptiveLassoSelector(gamma=1.0)
result = selector.fit(Phi, y, feature_names=names)

print(f"Equation: {result['equation']}")
print(f"Active terms: {result['n_active_terms']}")
print(f"R-squared: {result['r2_score']:.4f}")
""")
print()
print("=" * 70)
print("Module loaded successfully. Import via: %run 08_AdaptiveLasso.ipynb")
print("=" * 70)