In [1]:
import pandas as pd
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
import numpy as np
from explainers_lib.explainers.carla.growing_spheres import GrowingSpheresExplainer
# from explainers_lib.explainers import AlibiCFRL
from explainers_lib.aggregators import Pareto, All
from explainers_lib.datasets import Dataset
from explainers_lib.ensemble import Ensemble
from explainers_lib.model import TorchModel
import shap


In [2]:
url = "https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv"
df = pd.read_csv(url)
df = df.drop(['Name'], axis=1)
categorical_features = ['Sex', 'Pclass']
numerical_features = ['Age', 'Fare', 'Parents/Children Aboard', 'Siblings/Spouses Aboard']
target = 'Survived'


In [3]:
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer

class InverseColumnTransformer(ColumnTransformer):
    """
    A custom ColumnTransformer that adds an 'inverse_transform' method.

    IMPORTANT: This implementation is tightly coupled to the *specific*
    pipelines provided in the user's question. It assumes:
    1. A 'num' transformer pipeline with a 'scaler' step.
    2. A 'cat' transformer pipeline with an 'onehot' step.
    3. 'remainder' is set to 'passthrough'.
    
    It "inverts" the data by reversing only the reversible steps
    (scaler, onehot) while ignoring the non-reversible SimpleImputers.
    """
    
    # Override fit to store original DataFrame columns
    def fit(self, X, y=None):
        """
        Fits the transformer and stores the original column names
        and order if X is a pandas DataFrame.
        """
        if isinstance(X, pd.DataFrame):
            self.original_columns_ = list(X.columns)
        else:
            # Fallback for numpy arrays
            self.original_columns_ = list(range(X.shape[1]))
            
        # --- BUG FIX 1: This line is ESSENTIAL ---
        # It calls the parent's fit method, which fits all the
        # transformers (pipelines) and populates self.transformers_
        # return super().fit(X, y)

    def inverse_transform(self, X_transformed):
        """
        Reverses the scaling and one-hot encoding steps to return
        a NumPy array as close to the original as possible.
        
        Imputed values are NOT reversed.
        """
        
        # --- Improved Input Validation ---
        # Ensure X_transformed is a numpy array
        if not isinstance(X_transformed, np.ndarray):
            X_transformed = np.array(X_transformed)

        # Handle 1D array (e.g., a single prediction)
        if X_transformed.ndim == 1:
            X_transformed = X_transformed.reshape(1, -1)
            
        # Now, ensure it's 2D
        if X_transformed.ndim != 2:
            raise ValueError(f"X_transformed must be a 1D or 2D array, but got shape {X_transformed.shape}")
            
        if not hasattr(self, 'original_columns_'):
            raise ValueError("The 'fit' method must be called before 'inverse_transform'.")

        inverted_parts = {}
        current_col_idx = 0

        # Iterate over the fitted transformers
        for name, transformer, original_cols in self.transformers_:
            
            if transformer == 'drop':
                continue # Dropped columns are ignored

            elif transformer == 'passthrough':
                # This handles the 'remainder'
                n_features = len(original_cols)
                if n_features == 0:
                    continue
                
                # Slicing the numpy array
                part_data = X_transformed[:, current_col_idx : current_col_idx + n_features]
                # Storing as a DataFrame for easy combining later
                inverted_parts[name] = pd.DataFrame(part_data, columns=original_cols)
                current_col_idx += n_features

            else:
                # This is a Pipeline
                if name == 'num':
                    # --- Specific to the 'num' pipeline ---
                    try:
                        scaler = transformer.named_steps['scaler']
                    except KeyError:
                        raise ValueError("The 'num' pipeline must have a step named 'scaler'.")
                        
                    n_features = len(original_cols)
                    part_data = X_transformed[:, current_col_idx : current_col_idx + n_features]
                    
                    # Invert only the 'scaler' step
                    inverted_data = scaler.inverse_transform(part_data)
                    inverted_parts[name] = pd.DataFrame(inverted_data, columns=original_cols)
                    current_col_idx += n_features

                elif name == 'cat':
                    # --- Specific to the 'cat' pipeline ---
                    try:
                        onehot = transformer.named_steps['onehot']
                    except KeyError:
                        raise ValueError("The 'cat' pipeline must have a step named 'onehot'.")
                        
                    # Get the *output* feature count from the one-hot encoder
                    try:
                        n_output_features = len(onehot.get_feature_names_out(original_cols))
                    except Exception:
                        # Fallback if get_feature_names_out fails
                        n_output_features = sum(len(c) for c in onehot.categories_)
                        
                    part_data = X_transformed[:, current_col_idx : current_col_idx + n_output_features]
                    
                    # Invert only the 'onehot' step
                    inverted_data = onehot.inverse_transform(part_data)
                    inverted_parts[name] = pd.DataFrame(inverted_data, columns=original_cols)
                    current_col_idx += n_output_features
                
                else:
                    # Handle other pipelines
                    raise ValueError(f"Unknown transformer '{name}' is not supported for inversion.")

        # Concatenate all inverted DataFrames
        try:
            df_inverted = pd.concat(inverted_parts.values(), axis=1)
        except ValueError as e:
            raise RuntimeError(f"Failed to concatenate inverted parts. Check for index mismatches. Error: {e}")

        # Re-order to match the original DataFrame's column order
        df_inverted_original_order = df_inverted[self.original_columns_]
        
        return df_inverted_original_order.values


In [4]:
numerical_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='median')),('scaler', StandardScaler())])

categorical_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='most_frequent')),('onehot', OneHotEncoder(handle_unknown='ignore'))])

preprocessor = InverseColumnTransformer(transformers=[('num', numerical_transformer, numerical_features),('cat', categorical_transformer, categorical_features)], remainder='passthrough')


In [5]:
X = df.drop(target, axis=1)
y = df[target]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

X_train_processed = preprocessor.fit_transform(X_train)
preprocessor.fit(X_train)
X_test_processed = preprocessor.transform(X_test)

feature_names = numerical_features + list(preprocessor.named_transformers_['cat'].named_steps['onehot'].get_feature_names_out(categorical_features))

X_train_tensor = torch.FloatTensor(X_test_processed)

data = Dataset(X_test_processed, 
               y_test.values, 
               feature_names, 
               list(preprocessor.named_transformers_['cat'].named_steps['onehot'].get_feature_names_out(categorical_features)), 
               numerical_features, 
               [i for i in list(preprocessor.named_transformers_['cat'].named_steps['onehot'].get_feature_names_out(categorical_features)) if "Sex" in i] + ["Age"], 
               [])
print(feature_names)
pd.DataFrame(X_test_processed).head()


['Age', 'Fare', 'Parents/Children Aboard', 'Siblings/Spouses Aboard', 'Sex_female', 'Sex_male', 'Pclass_1', 'Pclass_2', 'Pclass_3']


Unnamed: 0,0,1,2,3,4,5,6,7,8
0,1.2543,-0.057632,-0.475009,-0.470388,0.0,1.0,1.0,0.0,0.0
1,-0.307067,0.151902,1.992254,0.402662,0.0,1.0,0.0,1.0,0.0
2,2.815666,-0.360229,-0.475009,-0.470388,0.0,1.0,0.0,0.0,1.0
3,1.893041,0.03693,-0.475009,-0.470388,0.0,1.0,1.0,0.0,0.0
4,1.964012,-0.40089,-0.475009,-0.470388,0.0,1.0,0.0,1.0,0.0


In [6]:
input_dim = X_train_processed.shape[1]
with open("experiments/models/titanic_classifier.pt", "rb") as f:
    model_data = f.read()

model = TorchModel.deserialize(model_data)
model, input_dim


(<explainers_lib.model.TorchModel at 0x7f513c92ba10>, 9)

In [7]:
ensemble = Ensemble(model,[GrowingSpheresExplainer()], All())

ensemble.fit(data)
cfs = ensemble.explain(data[:5], pretty_print=True, pretty_print_postprocess=preprocessor.inverse_transform, feature_names=preprocessor.original_columns_)

# cfs


5instance [00:00,  7.15instance/s]


In [None]:
cfs


[Counterfactual(original_data=array([ 1.25429964, -0.05763158, -0.47500894, -0.47038771,  0.        ,
         1.        ,  1.        ,  0.        ,  0.        ]), data=array([ 1.25429964,  0.08527239, -1.27271644,  0.11548021,  0.        ,
         1.        ,  1.        ,  0.        ,  0.        ]), original_class=np.int64(0), target_class=1, explainer='growing_spheres(step_size=0.2, max_iter=1000, num_samples=1000)'),
 Counterfactual(original_data=array([-0.30706713,  0.15190188,  1.99225362,  0.40266173,  0.        ,
         1.        ,  0.        ,  1.        ,  0.        ]), data=array([-0.30706713,  0.34041251,  1.95247168,  0.34898546,  0.        ,
         1.        ,  1.        ,  1.        ,  0.        ]), original_class=np.int64(0), target_class=1, explainer='growing_spheres(step_size=0.2, max_iter=1000, num_samples=1000)'),
 Counterfactual(original_data=array([ 2.8156664 , -0.36022882, -0.47500894, -0.47038771,  0.        ,
         1.        ,  0.        ,  0.        ,  