In [155]:
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression

In [156]:
features_in = [
"avgAnnCount",
"PctPrivateCoverage",
"PctPublicCoverage",
"PctBachDeg25_Over",
"PercentMarried",
"incidenceRate",
"povertyPercent",
"MedianAge",
"medIncome",
"PctWhite",
"PctBlack",
"PctAsian",
"PctOtherRace",
"avgDeathsPerYear",
]

In [157]:
df = pd.read_csv("./data/cancer_reg.csv", encoding='iso-8859-1', usecols=features_in, dtype=float)

In [158]:
df

Unnamed: 0,avgAnnCount,avgDeathsPerYear,incidenceRate,medIncome,povertyPercent,MedianAge,PercentMarried,PctBachDeg25_Over,PctPrivateCoverage,PctPublicCoverage,PctWhite,PctBlack,PctAsian,PctOtherRace
0,1397.000000,469.0,489.800000,61898.0,11.2,39.3,52.5,19.6,75.1,32.9,81.780529,2.594728,4.821857,1.843479
1,173.000000,70.0,411.600000,48127.0,18.6,33.0,44.5,22.7,70.2,31.1,89.228509,0.969102,2.246233,3.741352
2,102.000000,50.0,349.700000,49348.0,14.6,45.0,54.2,16.0,63.7,42.1,90.922190,0.739673,0.465898,2.747358
3,427.000000,202.0,430.400000,44243.0,17.1,42.8,52.7,9.3,58.4,45.3,91.744686,0.782626,1.161359,1.362643
4,57.000000,26.0,350.100000,49955.0,12.5,48.3,57.8,15.0,61.6,44.0,94.104024,0.270192,0.665830,0.492135
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3042,1962.667684,15.0,453.549422,46961.0,12.4,44.2,51.0,15.2,78.3,31.7,90.280811,3.837754,0.327613,1.700468
3043,1962.667684,43.0,453.549422,48609.0,18.8,30.4,52.6,12.4,64.5,28.8,75.706245,2.326771,4.044920,14.130288
3044,1962.667684,46.0,453.549422,51144.0,15.0,30.9,54.8,12.8,62.0,26.6,87.961629,2.313188,1.316472,5.680705
3045,1962.667684,52.0,453.549422,50745.0,13.3,39.0,58.8,14.4,75.9,29.5,92.905681,1.176562,0.244632,2.131790


In [159]:
import numpy as np
from sklearn.linear_model import LinearRegression

class FWL_ResidualizerV1:
    def __init__(self):
        """
        threshold: limiar de correlação para considerar multicolinearidade
        """
        self.models = {}
        self.residualized_columns = []
        self.feature_names = None

    def fit(self, X, threshold=0.8):
        """
        X: DataFrame ou array com as variáveis explicativas
        """
        
        self.feature_names = X.columns if hasattr(X, "columns") else None

        X = np.asarray(X)
        corr = np.abs(np.corrcoef(X, rowvar=False))

        for i in range(X.shape[1]):
            corr_feature = corr[i]
            mask = np.ones(corr_feature.shape, dtype=bool)
            mask[i] = False
            indexes = np.argwhere((corr_feature >= threshold) & mask).flatten()

            if len(indexes) > 0:
                model = LinearRegression()
                model.fit(X[:, indexes], X[:, i])
  
                self.models[i] = {"model": model,
                                "X": indexes,
                                "y": i,
                            }
        
        return self
    
    def transform(self, X):
        """
        Aplica a residualização aos dados
        """
        
        X = np.asarray(X)
        
        for i in self.models:
            model_info = self.models[i]
            pred = model_info["model"].predict(X[model_info['X']])
            X[i] = X[i] - pred
            
        return X
    
    def fit_transform(self, X):
        self.fit(X)
        return self.transform(X)

In [160]:
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.utils.validation import check_is_fitted

class OrthogonalFeaturesV2:
    def __init__(self):
        """
        threshold: limiar de correlação para considerar multicolinearidade
        """
        self.models = None

    def fit(self, X, corrcoef=0.8):
        """
        X: DataFrame ou array com as variáveis explicativas
        """

        self.models = {}
        
        self.feature_names_in_ = getattr(X, 'columns', None)

        X = np.asarray(X)
        n_features = X.shape[1]
        corr = np.abs(np.corrcoef(X, rowvar=False))
        residualized = []

        for i in range(n_features):
            corr_feature = corr[i]
            mask = np.ones(corr_feature.shape, dtype=bool)
            mask[residualized + [i]] = False
            indexes = np.argwhere((corr_feature >= corrcoef) & mask).flatten()

            if len(indexes) > 0:
                model = LinearRegression()
                model.fit(X[:, indexes], X[:, i])
  
                self.models[i] = {"model": model,
                                "features": indexes,
                            }
                residualized.append(i)
        
        return self
    
    def transform(self, X):
        """
        Aplica a residualização aos dados
        """
        
        check_is_fitted(self)
        X = np.asarray(X)
        X = X.copy()
        
        for i in self.models:
            model_info = self.models[i]
            pred = model_info["model"].predict(X[:, model_info['features']])
            X[:, i] -= pred
            
        return X
    
    def fit_transform(self, X, corrcoef):
        self.models = {}
        self.fit(X, corrcoef=corrcoef)
        return self.transform(X)

In [161]:
%%time
residualizer = OrthogonalFeaturesV2()
residualizer.fit(df)
residualizer.models

CPU times: user 0 ns, sys: 6.27 ms, total: 6.27 ms
Wall time: 5.37 ms


{0: {'model': LinearRegression(), 'features': array([1])},
 4: {'model': LinearRegression(), 'features': array([8])},
 10: {'model': LinearRegression(), 'features': array([11])}}

In [162]:
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.validation import check_array
from sklearn.base import BaseEstimator, TransformerMixin

class FeatureResidualizer(BaseEstimator, TransformerMixin):
    def __init__(self):
        """Feature transformer that reduces multicollinearity by residualizing highly correlated features.
    
        For each feature strongly correlated (above `corrcoef` threshold) with others, this class:
        1. Fits a linear regression model using the correlated features as predictors.
        2. Replaces the original feature with residuals (observed - predicted) from the model,
        effectively removing linear dependencies.
        
        Useful as a preprocessing step for linear models where multicollinearity is problematic.
        
        Attributes
        ----------
        models_ : Dict[int, Dict[str, Any]]
            Dictionary storing residualization models for each processed feature.
            Keys are feature indices; values are dicts with:
            - "model": Fitted `LinearRegression` object.
            - "features": Indices of features used as predictors.
        feature_names_in_ : Optional[np.ndarray]
            Names of input features if provided in a pandas DataFrame.
        n_features_in_ : Optional[int]
            Number of features seen during fit.
        """
        self.models_ = None
        self.feature_names_in_ = None
        self.n_features_in_ = None

    def fit(self, X: np.ndarray, corrcoef: float = 0.8):
        """
        Identify feature pairs with absolute correlation ≥ `corrcoef` and prepare residualization models.
        
        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Input data. Can be a pandas DataFrame (preserves column names) or numpy array.
        corrcoef : float, default=0.8
            Absolute correlation threshold for triggering residualization.
            Features with |ρ| ≥ this value will be residualized.
            
        Returns
        -------
        self : OrthogonalFeaturesV3
            Fitted transformer.
        """

        self.models_ = {}
        self.feature_names_in_ = getattr(X, 'columns', None)
        X = check_array(X, ensure_2d=True, dtype=np.float64)
        self.n_features_in_ = X.shape[1]
        
        corr = np.abs(np.corrcoef(X, rowvar=False))
        np.fill_diagonal(corr, 0)
        residualized = []

        processing_order = np.argsort(-np.sum(corr, axis=1))

        for i in processing_order:
            corr_feature = corr[i]
            mask = np.ones(corr_feature.shape, dtype=bool)
            mask[residualized] = False
            indexes = np.argwhere((corr_feature >= corrcoef) & mask).flatten()

            if len(indexes) > 0:
                model = LinearRegression()
                model.fit(X[:, indexes], X[:, i])
  
                self.models_[i] = {"model": model,
                                "features": indexes,
                            }
                residualized.append(i)
        
        return self
    
    def transform(self, X: np.ndarray) -> np.ndarray:
        """
        Apply residualization to the input data using pre-trained models.
        
        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Data to transform. Must match feature count of `fit()` input.
            
        Returns
        -------
        X_transformed : ndarray of shape (n_samples, n_features)
            Data with residualized features (others remain unchanged).
            
        Raises
        ------
        ValueError
            If number of features in X doesn't match training data.
        """
        
        check_is_fitted(self, 'models_')
        X = check_array(X, ensure_2d=True, dtype=np.float64)
        X = X.copy()
        
        if X.shape[1] != self.n_features_in_:
            raise ValueError(
                f"Expected {self.n_features_in_} features, got {X.shape[1]}"
            )
        
        for i, model_info in self.models_.items():
            model_info = self.models_[i]
            X[:, i] -= model_info["model"].predict(X[:, model_info["features"]])
            
        return X
    
    def fit_transform(self, X: np.ndarray, corrcoef: float = 0.8) -> np.ndarray:
        """Convenience method for fit().transform()."""
        return self.fit(X, corrcoef=corrcoef).transform(X)

In [163]:
residualizer = FeatureResidualizer()
residualizer.fit(df, corrcoef=0.40)
residualizer.models_

{4: {'model': LinearRegression(),
  'features': array([ 3,  6,  7,  8,  9, 10, 11])},
 3: {'model': LinearRegression(), 'features': array([ 7,  8,  9, 12])},
 8: {'model': LinearRegression(), 'features': array([ 6,  7,  9, 10])},
 9: {'model': LinearRegression(), 'features': array([7])},
 7: {'model': LinearRegression(), 'features': array([12])},
 6: {'model': LinearRegression(), 'features': array([10, 11])},
 10: {'model': LinearRegression(), 'features': array([11])},
 12: {'model': LinearRegression(), 'features': array([0, 1])},
 0: {'model': LinearRegression(), 'features': array([1])}}