**1. Import Libraries**

In [None]:
import numpy as np
import pandas as pd
import time
from typing import Tuple, Dict
from scipy import linalg

from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Ridge, LassoCV
from sklearn.metrics import r2_score
from sklearn.preprocessing import StandardScaler # Added for preprocessing consistency

: 

**2. Target-Aware ARS(Adaptive Residual Sampling) QRCP**

In [None]:
# Scikit-Learn Style Class of TAAQ
class TAAQ:
    """
    Target-Aware ARS-QRCP (TAAQ) Feature Selection

    3-Phase Algorithm:
    - Phase 1: QRCP warm-up for k1
    - Phase 2: Target-Projected Residual Sampling (TPRS) with one-step deflation
    - Phase 3: Geometry-Predictive Pruning to size k
    """

    def __init__(self, k: int, k1_fraction: float = 0.5, delta: int = 10,
                epsilon: float = 1e-6, supervised: bool = True):
        self.k = k
        self.k1_fraction = k1_fraction
        self.delta = delta
        self.epsilon = epsilon
        self.supervised = supervised
        self.selected_features_ = None
        self.runtime_ = {}

    def fit(self, M: np.ndarray, y:np.ndarray) -> 'TAAQ':
        """
        Fit TAAQ Selector (Model Trainer)
        """

        # Stamp the start time
        start_time = time.time()

        # Preprocessing
        M_std, y_std = self._preprocess(M, y)
        m, n = M_std.shape

        # Phase1: QRCP warm-up
        t1 = time.time()
        k1 = int(self.k * self.k1_fraction)
        Q1, R11, S1 = self._phase1_partial_qrcp(M_std, k1)
        self.runtime_['phase1'] = time.time() - t1

        # Phase2: TPRS (Target-Projected Residual Sampling)
        t2 = time.time()
        S2_prime = self._phase2_tprs(M_std, y_std, Q1, S1, k1, self.delta)
        self.runtime_['phase2'] = time.time() - t2

        # Phase3: GPP (Geometry-Predictive Pruning)
        t3 = time.time()
        S_tmp = np.concatenate([S1, S2_prime])
        S_final = self._phase3_gpp(M_std, y_std, S_tmp, self.k)
        self.runtime_['phase3'] = time.time() - t3

        self.selected_features_ = S_final
        self.runtime_['total'] = time.time() - start_time

        return self

    def _preprocess(self, M: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """
        Standardize columns of M and scale y.
        """
        m, n = M.shape

        # Standardize columns (axis=0): zero mean, unit variance
        M_std = (M - M.mean(axis=0, keepdims=True)) / (M.std(axis=0, keepdims=True) + 1e-10)

        # Center and scale y so that ||y||_2^2 = m
        y_mean = y.mean()
        y_sd = y.std()
        y_std = (y - y_mean) / (y_sd + 1e-10)

        return M_std, y_std

    def _phase1_partial_qrcp(self, M: np.ndarray, k1: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Phase 1: Highly optimized Partial QRCP (k1 steps only).

        Complexity: O(m*n*k1)
        """
         
        m, n = M.shape
        
        # Working copy
        A = M.copy()
        
        # perm tracks the original column indices.
        # perm[k] will store the original index of the k-th pivot column.
        perm = np.arange(n)
        
        # Pre-compute the squared L2-norm of each column for pivoting.
        # We use squared norms to avoid sqrt.
        col_norms_full = np.sum(A**2, axis=0)
        col_norms_partial = col_norms_full.copy()
        
        # Storage for the 'tau' scalars needed to reconstruct Q.
        # H_k = I - tau_scalars[k] * v_k * v_k^T
        tau_scalars = np.zeros(k1)

        # main loop
        for k in range(k1):
            
            # ====================
            # 1. COLUMN PIVOTING
            # ====================
            # Find the column with the maximum remaining norm in the sub-array A[k:, k:].
            j_max = k + np.argmax(col_norms_partial[k:])
            
            if j_max != k:
                # Swap the pivot column (j_max) with the current column (k)
                A[:, [k, j_max]] = A[:, [j_max, k]]
                perm[[k, j_max]] = perm[[j_max, k]]
                col_norms_full[[k, j_max]] = col_norms_full[[j_max, k]]
                col_norms_partial[[k, j_max]] = col_norms_partial[[j_max, k]]
            
            # ========================================
            # 2. HOUSEHOLDER QR STEP (on column k)
            # ========================================
            
            # Get the k-th column vector starting from the diagonal.
            x_k = A[k:m, k]
            norm_x_k = np.linalg.norm(x_k)

            if norm_x_k < 1e-14:
                # This column is already (numerically) zero.
                # No reflection needed.
                tau_scalars[k] = 0
                continue
            
            # 'sigma_k' is the target value for the first element,
            # sign((x_k)_1) * ||x_k||.
            sigma_k = -np.sign(x_k[0]) * norm_x_k if x_k[0] != 0 else -norm_x_k
            
            # --- Calculate the LAPACK-style Householder vector 'v' and 'tau' ---
            
            # 1. u_k = x_k - sigma_k * e_1
            u_0 = x_k[0] - sigma_k
            u_tail = x_k[1:]
            
            # 2. tau_k = -u_0 / sigma_k
            tau_k = -u_0 / sigma_k
            tau_scalars[k] = tau_k # Store for Q reconstruction
            
            # 3. v_k = u_k / u_0 = [1, u_tail / u_0]^T
            v_tail = u_tail / u_0 if u_0 != 0 else u_tail # Avoid division by zero

            # Apply the reflection H_k to all remaining columns (k+1 to n)
            # A_sub = (I - tau_k * v_k * v_k^T) @ A_sub
            if k < n - 1:
                A_sub = A[k:m, k+1:n]
                A_sub_head = A_sub[0, :]     # First row of the submatrix
                A_sub_tail = A_sub[1:, :]    # Rest of the submatrix
                
                # w = (v_k^T @ A_sub)
                #   = (v_k[0] * A_sub_head) + (v_tail^T @ A_sub_tail)
                #   = (1 * A_sub_head) + (v_tail^T @ A_sub_tail)
                w = A_sub_head + (v_tail @ A_sub_tail)
                
                # A_sub = A_sub - tau_k * v_k * w^T
                A[k, k+1:n] -= tau_k * w              # Update head (v_k[0]=1)
                A[k+1:m, k+1:n] -= tau_k * np.outer(v_tail, w) # Update tail
            
            # Store R_kk and the Householder vector v_tail
            # directly into the matrix A.
            A[k, k] = sigma_k      # R's diagonal element
            A[k+1:m, k] = v_tail # The tail of the vector v_k
            
            # ===================================
            # 3. UPDATE COLUMN NORMS (downdate)
            # ===================================
            # This is the O(n) part.
            # Use Pythagoras to update the remaining norm of each column after the reflection.
            # new_tail_norm^2 = total_norm^2 - new_head_val^2
            if k < n - 1:
                for j in range(k+1, n):
                    # temp = 1 - (A[k, j] / col_norms_partial[j])**2
                    temp_ratio = A[k, j] / col_norms_partial[j] if col_norms_partial[j] > 0 else 0
                    temp_ratio = max(0, (1 - temp_ratio) * (1 + temp_ratio)) # (1-t^2)
                    
                    # col_norms_partial[j] = col_norms_partial[j] * sqrt(temp_ratio)
                    col_norms_partial[j] *= np.sqrt(temp_ratio)
                    
                    # Failsafe: Recompute norm if numerical error accumulates
                    if col_norms_partial[j] < 0.1 * np.sqrt(col_norms_full[j]):
                        col_norms_partial[j] = np.linalg.norm(A[k+1:m, j])
                        col_norms_full[j] = col_norms_partial[j]**2
        
        # ====================================================================
        # 4. EXTRACT RESULTS (Q, R, S)
        # ====================================================================
        
        # R11 is the upper-triangular k1 x k1 block of A
        R11 = np.triu(A[:k1, :k1])

        Q1 = np.eye(m, k1)
        
        for k in range(k1-1, -1, -1):
            tau_k = tau_scalars[k] # Get the stored scalar
            
            if tau_k != 0:
                # Reconstruct v_k = [1, v_tail]^T
                v = np.zeros(m - k)
                v[0] = 1
                v[1:] = A[k+1:m, k] # Get the stored v_tail
                
                # Apply the transformation:
                # Q1[k:, k:] = H_k @ Q1[k:, k:]
                #            = (I - tau_k * v * v^T) @ Q1[k:, k:]
                w = tau_k * (v @ Q1[k:, k:])
                Q1[k:, k:] -= np.outer(v, w)
        
        # S1: The original indices of the first k1 pivot columns
        S1 = perm[:k1]
        
        return Q1, R11, S1
    
    def _phase2_tprs(self, M: np.ndarray, y: np.ndarray, Q1: np.ndarray,
                    S1: np.ndarray, k1: int, delta: int) -> np.ndarray:
        """
        Phase 2: Target-Projected Residual Sampling with one-step 
        """
        m, n = M.shape
        k2 = self.k - k1
        # ==========
        # 1. TPRS
        # ==========
        # Target residual
        r_y = y - Q1 @ (Q1.T @ y)

        # Precompute residualized features
        Q1_T_M = Q1.T @ M

        # b_j = \|x_j\|_2^2 - \|Q_1^Tx_j\|_2^2
        b = np.sum(M**2, axis=0) - np.sum(Q1_T_M**2, axis=0)
        b = np.maximum(b, self.epsilon) # Prevent 0

        # s_j = r_y^TP_\perpx_j = r_y^Tx_j
        s = r_y @ M

        # Supervised score a_j := s_j^2 / (b_j + \epsilon)
        if self.supervised:
            a = s**2 / (b + self.epsilon)
        else:
            a = b
        # ==================================================
        # 2. Greedy Oversampling with one-step deflation
        # ==================================================
                
        # Initialize candidate pool
        valid_mask = np.ones(n, dtype=bool)
        valid_mask[S1] = False

        # List for chosen columns in phase 2
        S2_prime = []

        for _ in range(k2+delta):
            a_masked = np.where(valid_mask, a, -np.inf)

            # j* = argmax a_j
            j_star = np.argmax(a_masked)

            # Add it to the answer
            S2_prime.append(j_star)

            # Update for the next iteration
            valid_mask[j_star] = False

            # ----- One-Step Deflation ----- #

            # Choose the column for the chosen j
            x_j = M[:, j_star]

            # Get P_\perp x_j
            P_perp_x = x_j - Q1 @ (Q1.T @ x_j)

            # Normalize it
            norm_P_perp_x = np.linalg.norm(P_perp_x)

            if norm_P_perp_x > 1e-10:

                # New direction (contribution) of x_j
                u = P_perp_x / norm_P_perp_x

                # The Magnitude * direction = The new contribution of x_j
                g = u.T @ M

                # The new columns contribution to the label
                alpha = u.T @ r_y

                # Decrease the residual
                r_y -= alpha * u

                # Decrease the correlation of each columns
                s -= alpha * g

                if self.supervised:
                    a = s**2 / (b+self.epsilon)

                else:
                    a = b     

        return np.array(S2_prime, dtype=int)

    def _phase3_gpp(self, M: np.ndarray, y: np.ndarray,
                   S_tmp: np.ndarray, k:int) -> np.ndarray:
        """
        Phase 3: Geometry-Predictive Pruning to size k.
        """
        # Working copy
        S_current = S_tmp.copy()

        # Measure delta that we are going to prune
        delta = len(S_tmp) - k

        for _ in range(delta):
            C = M[:, S_current]

            # QR Decomposition
            # mode='economic' --> thin QR (Q = m*n, R = n*n)
            Q, R = linalg.qr(C, mode='economic')

            # Compute beta (R^-1(Q^T y)) --> "Predictive"
            # We don't get inverse, we use "back substitution" (solve_triangular)
            # \beta = min\|C\beta-y\|_2^2
            # Least Square Problem
            # Equation: C\beta = y
            # Solution: \beta = (C^TC)^-1C^Ty
            # \beta = (R^TQ^TQ^R)^-1R^TQ^Ty = (R^TR)^-1R^TQ^Ty = R^-1Q^Ty
            beta = linalg.solve_triangular(R, Q.T @ y, lower=False)

            # Compute r_i --> "Geometric Redundancy" (Numerical Instability)
            # VIF (Variance Inflaction Factor) = 1/ (1 - R_i)
            # C = QR --> G = C^TC = R^TR
            # r_i = \|R^-Te_i\|_2^2 = (R^-Te_i)^T(R^-Te_i) = e_i^T(R^-1R^-T)e_i = e_i^T(R^TR)^-1e_i = [(C^TC)^-1]_ii
            # By block matrix inversion --> (G^-1)_ii = 1 / c_i^Tc_i(1-R_i^2) = 1/\|c_i\|_2^2 \times VIF_i
            R_inv_T = linalg.solve_triangular(R, np.eye(len(S_current)), trans='T', lower=False)
            r_i = np.sum(R_inv_T**2, axis=0)

            # Compute drop scores: d_i = r_i / (beta_i^2 + epsilon)
            d_i = r_i / (beta**2 + self.epsilon)

            # Remove worst feature
            i_star = np.argmax(d_i)
            S_current = np.delete(S_current, i_star) # Delete the column

        return S_current
        
    def transform(self, M: np.ndarray) -> np.ndarray:
        """
        Select features from M.
        """
        
        if self.selected_features_ is None:
            raise ValueError("Must fit before transform")
            
        return M[:, self.selected_features_]  
        
    def fit_transform(self, M: np.ndarray, y: np.ndarray) -> np.ndarray:
        """
        Fit and transform in one step.
        """
        return self.fit(M, y).transform(M)

**3. Baseline Methods**

In [None]:
class RRQR:
    """
    Strong Rank-Revealing QR Decomposition.

    Produces A*Pi = Q*R where:
    - R11 (leading black) is well-conditioned.
    - R22 (training block) is small (low residual energy)
    - Coupling ||R11^-1 R12|| is small
    """

    def __init__(self, f = 2.0, max_iter = 10):
        """
        Parameters:
        f : float
            Tolerance for maximum entry in coupling matrix T.
            If max(abs(T)) <= f, decompisition is considered rank-revealing.
        max_iter : int
            Maximum number of column swap iterations
        """
        self.f = f
        self.max_iter = max_iter
        self.Q = None
        self.R = None
        self.P = None

    def fit(self, A, k = None):
        """
        Compute strong RRQR factorization.

        Parameters:
        A : array_like, shape (m, n)
            Input matrix
        k : int, optional
            Target rank. If None, use full rank
        """
        A = np.array(A, dtype = float)
        m, n = A.shape
        if k is None:
            k = min(m, n)

        #### Step 1: initial QRCP Execution ####
        # standard QR with column pivoting (weak RRQR)
        # Q is an orthogonal square matrix
        # R is an upper triangular 
        # P is a permutation matrix that reorders columns of A
        Q, R, P = qr(A, pivoting=True, mode='full')
        P = np.array(P)

        iter_count = 0
        while iter_count < self.max_iter:
            #### Step 2: Partition R into leading and trailing blocks ####
            
            # R11: leading kxk block <- this are the first k selected columms
            # R12: trailing k x (n-k) block <- decribes how much of the remaining columns depend on leading ones
            R11 = R[:k, :k]
            R12 = R[:k, :k]

            #### Step 3: Compute the Coupling Matrix T ####
            #### “strong” feature reordering ####
            # Solve R11 * T = R12 for T (triangular solve)
            # T tells us how much the trailing columns depend on the leading ones
            if R12.shape[1] == 0:
                break
            T = solve_triangular(R11, R12)

            #### Step 4: Check the largest element of T for tolerance ####
            i_star, j_star = np.unravel_index(np.abs(T).argmax(), T.shape)
            max_T = np.abs(T[i_star, j_star])

            if max_T <= self.f:
                # Coupling is small enough -> decomposition is rank-revealing
                break
            
            #### Step 5: Correction Plan - Swap columns based on max entry in T ####
            # Swap columns: i_star in R11 (leading block), j_star in R12 (trailing block)
            # If some trailing column depends too much (i.e., large entries in T), we swap that column forward
            col1 = i_star
            col2 = k + j_star
            R[:, [col1, col2]] = R[:, [col2, col1]]     # swap columns
            P[[col1, col2]] = P[[col2, col1]]           # update permuation

            #### Step 6: Restore upper triangular structure with a simple QR on affected columns ####
            # After the column wrap, R is not upper triangular 
            # Re-factor the affected block with QR to zero out the subdiagonal
            rows = slice(i_star, m)
            cols = slice(col1, col2 + 2)
            Q_local, R_local = qr(R[rows, cols], mode = 'economic')
            R[rows, cols] = R_local     # updated affected submatrix
            # Q update is ignored for simplicity

            iter_count += 1

        #### Step 7: Recompute Q for the permutated matrix ####
        # After all swaps, recompute Q to match final R and permutation
        self.Q, R_full = qr(A[:, P], mode = 'full')
        self.R = R_full
        # P holds the final column permutation indices, ranking from most to least 
        self.P = P

        return self
         
    def transform(self, X):
        """
        Apply RRQR transformation to a new matrix X
        """
        if self.Q is None:
            raise ValueError("Must fit first.")
        return self.Q.T @ X[:, self.P]

class QRCP:
    """
    QR Decomposition with Column Pivoting (QRCP) 

    Produces a weak rank-revealing decomposition:
        X[:, P] = Q @ R
    where:
        - Q is orthogonal
        - R is upper-triangular
        - P is a permutation array (pivot indices)
    
    Can select the top-k columns based on pivoting.
    """

    def __init__(self, top_k: int = None):
        """
        Parameters
        ----------
        top_k : int, optional
            Number of top features (columns) to select. If None, select all.
        """
        self.top_k = top_k                # Target number of features
        self.Q = None                     # Orthogonal matrix from QR
        self.R = None                     # Upper-triangular matrix from QR
        self.P = None                     # Column permutation indices
        self.selected_features_ = None    # Indices of top-selected features
        self.runtime_ = {}                # Dictionary to track runtimes

    def fit(self, X: np.ndarray, y=None) -> 'QRCP':
        """
        Fit the QRCP model to the data X.

        Parameters
        ----------
        X : np.ndarray, shape (m, n)
            Input feature matrix
        y : ignored
            Included for API compatibility with supervised methods.

        Returns
        -------
        self : QRCP
            Fitted object with Q, R, P, and selected_features_ populated.
        """
        start_time = time.time()

        # STEP 1: Compute QR decomposition with column pivoting
        t_qr_start = time.time()
        Q, R, piv = qr(X, pivoting=True, mode='economic')
        self.runtime_['qr_decomposition'] = time.time() - t_qr_start

        # Store decomposition results
        self.Q = Q
        self.R = R
        self.P = np.array(piv)

        # STEP 2: Determine selected features (top-k pivoted columns)
        n_features = X.shape[1]
        k = self.top_k if self.top_k is not None else n_features
        self.selected_features_ = self.P[:k]

        # Record total runtime
        self.runtime_['total'] = time.time() - start_time

        return self

    def transform(self, X: np.ndarray) -> np.ndarray:
        """
        Transform X by selecting the top-k pivoted columns.
        """
        if self.selected_features_ is None:
            raise ValueError("Must call fit() before transform()")
        return X[:, self.selected_features_]

    def fit_transform(self, X: np.ndarray, y=None) -> np.ndarray:
        """
        Convenience method to fit and transform in one step.

        """
        return self.fit(X, y).transform(X)
    
class LassoSelector:
    """ 
    Lasso selector trains lasso cv to zero out the weights of the unimportant 
    features and then it selects the non zero features which are the important ones and 
    returns a dataset with only those features. 
    """
    # sets up selector
    def __init__(self, k=20, cv=5, random_state=None):
        # number of features to select
        self.k = k
        # number of folds for cross validation 
        self.cv = cv
        self.random_state = random_state
        self.model = LassoCV(cv=self.cv, n_jobs=-1, max_iter=5000, random_state=self.random_state)
        self.selected_idx_ = None

    #trains the model and gets weights
    def fit(self, X, y):
        # fit LassoCV and select top-k features
        self.model.fit(X, y)
        coefs = np.abs(self.model.coef_)

        # incase k is all zero, select first k features
        if np.all(coefs == 0):
            self.selected_idx_ = np.arange(min(self.k, X.shape[1]))
        else:
            # select top-k features by magnitude
            self.selected_idx_ = np.argsort(coefs)[-self.k:]

        return self

    # returns new dataset with only selected features
    def transform(self, X):
        # reduce X to the selected features
        if self.selected_idx_ is None:
            raise RuntimeError("You must call fit() before transform().")
        return X[:, self.selected_idx_]

    # combines fit and transform into one step
    def fit_transform(self, X, y):
        self.fit(X, y)
        return self.transform(X)

    # shows the features that were selected
    def get_support(self, indices=False):
        if indices:
            return self.selected_idx_
        mask = np.zeros(self.model.coef_.shape, dtype=bool)
        mask[self.selected_idx_] = True
        return mask

**4. Evaluation Framework**

**5. Data Generators**

**6. Experiments**

**7. Quick Test**

In [None]:
def run_simple_experiment():
    """Runs a basic test of the TAAQ class."""

    print("--- Running Simple TAAQ Experiment ---")

    # 1. Generate synthetic data
    print("Generating synthetic data...")
    m, n = 500, 100 # Samples, Features
    k_true = 15     # Number of informative features in the data
    X, y = make_regression(n_samples=m, n_features=n, n_informative=k_true,
                            noise=10, random_state=42)
    print(f"Data shape: X={X.shape}, y={y.shape}")

    # 2. Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    print(f"Train shapes: X={X_train.shape}, y={y_train.shape}")
    print(f"Test shapes:  X={X_test.shape}, y={y_test.shape}")

    # 3. Instantiate and Fit TAAQ
    k_select = 20 # Target number of features to select
    print(f"\nInitializing TAAQ to select k={k_select} features...")
    taaq_selector = TAAQ(k=k_select, k1_fraction=0.5, delta=5, supervised=True)

    print("Fitting TAAQ selector on training data...")
    start_fit = time.time()
    taaq_selector.fit(X_train, y_train)
    fit_time = time.time() - start_fit
    print(f"Fit completed in {fit_time:.4f} seconds.")

    # Print selected features and runtime breakdown
    print("Selected feature indices:", taaq_selector.selected_features_)
    print("Runtime breakdown (seconds):")
    for phase, t in taaq_selector.runtime_.items():
        print(f"  - {phase}: {t:.4f}")

    # 4. Transform the data
    print("\nTransforming train and test data using selected features...")
    X_train_taaq = taaq_selector.transform(X_train)
    X_test_taaq = taaq_selector.transform(X_test)
    print(f"Transformed train shape: {X_train_taaq.shape}")
    print(f"Transformed test shape:  {X_test_taaq.shape}")

    # 5. Train and evaluate a simple Ridge model
    print("\nTraining Ridge model on TAAQ-selected features...")
    model = Ridge(alpha=1.0)
    model.fit(X_train_taaq, y_train)

    print("Evaluating model on transformed test data...")
    y_pred = model.predict(X_test_taaq)
    r2 = r2_score(y_test, y_pred)
    print(f"\nR² score on test set using TAAQ features: {r2:.4f}")

    # --- Optional: Compare with using all features ---
    print("\nTraining Ridge model on ALL features for comparison...")
    model_all = Ridge(alpha=1.0)
    # Need to scale data if comparing fairly, especially for Ridge
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    model_all.fit(X_train_scaled, y_train)
    y_pred_all = model_all.predict(X_test_scaled)
    r2_all = r2_score(y_test, y_pred_all)
    print(f"R² score on test set using ALL features: {r2_all:.4f}")

    print("\n--- Experiment Finished ---")


# ============================================================================
# Run the experiment
# ============================================================================
if __name__ == "__main__":
    run_simple_experiment()

**8. Main Execution**