In [1]:
FOLKTABLES_DATA_PATH = "/vol/bitbucket/hh2721/folktables"


In [None]:
from folktables import ACSDataSource, ACSIncome, generate_categories

In [4]:
data_source = ACSDataSource(survey_year="2018",
                            horizon="1-Year",
                            survey="person",
                            root_dir=FOLKTABLES_DATA_PATH)

ca_data = data_source.get_data(states=["CA"], download=False)

In [12]:
ca_features, ca_labels, _ = ACSIncome.df_to_pandas(ca_data)

ca_features.head()

Unnamed: 0,AGEP,COW,SCHL,MAR,OCCP,POBP,RELP,WKHP,SEX,RAC1P
0,30.0,6.0,14.0,1.0,9610.0,6.0,16.0,40.0,1.0,8.0
1,21.0,4.0,16.0,5.0,1970.0,6.0,17.0,20.0,1.0,1.0
2,65.0,2.0,22.0,5.0,2040.0,6.0,17.0,8.0,1.0,1.0
3,33.0,1.0,14.0,3.0,9610.0,36.0,16.0,40.0,1.0,1.0
4,18.0,2.0,19.0,5.0,1021.0,6.0,17.0,18.0,2.0,1.0


In [11]:
ca_labels.head()


Unnamed: 0,PINCP
0,False
1,False
2,False
3,False
4,False


In [1]:
from abc import ABC, abstractmethod
from typing import Tuple

import torch
from torch.utils.data import Dataset as TorchDataset
import numpy as np
from sklearn.preprocessing import StandardScaler

"""
A collection of constants used throughout the project.
"""

# For reproducibility, we set a random seed.
RANDOM_SEED = 69420

TRAIN_SPLIT = 0.7
VAL_SPLIT = 0.15
TEST_SPLIT = 0.15
# The splits should sum to 1.0
assert TRAIN_SPLIT + VAL_SPLIT + TEST_SPLIT == 1.0, \
    "Train, validation, and test splits must sum to 1.0"



class FairnessDataset(TorchDataset):
    """
    Wraps pre-processed features, labels, and optional protected attributes as a PyTorch Dataset.
    Data is expected to be in NumPy array format and will be converted to PyTorch tensors.
    """
    
    def __init__(self, features_np: np.ndarray, labels_np: np.ndarray, protected_attrs_np: np.ndarray | None = None):
        """
        Args:
            features_np: NumPy array of features.
            labels_np: NumPy array of labels.
            protected_attrs_np: Optional NumPy array of protected attributes.
        """
        self.features = torch.tensor(features_np, dtype=torch.float32)
        # Ensure labels are float32 for losses like BCELoss
        self.labels = torch.tensor(labels_np, dtype=torch.float32) 
        
        if protected_attrs_np is not None:
            self.protected_attrs = torch.tensor(protected_attrs_np, dtype=torch.float32)
        else:
            self.protected_attrs = None
    
    def __len__(self):
        return self.features.shape[0]
    
    def __getitem__(self, idx):
        """
        Return features, labels, and optionally protected attributes.
        """
        item_features = self.features[idx]
        item_labels = self.labels[idx]
        
        if self.protected_attrs is not None:
            return item_features, item_labels, self.protected_attrs[idx]
        else:
            return item_features, item_labels


class BaseDataset(ABC):
    """
    Abstract base class for dataset wrappers that provides a universal interface
    for loading, processing and accessing datasets in any format.
    """
    
    def __init__(self):
        """Initialize the dataset by loading the underlying AIF360 data."""
        self._aif360_dataset_original = self.load_data()
    
    @abstractmethod
    def load_data(self):
        """
        Loads the dataset from the AIF360 dataset object.
        Must be implemented by subclasses.
        
        Returns:
            AIF360 dataset object
        """
        raise NotImplementedError("load_data() must be implemented in subclasses")
    
    def to_torch(self, include_protected=True) -> Tuple[FairnessDataset, FairnessDataset, FairnessDataset]:
        """
        Processes the AIF360 dataset:
        1. Extracts features, labels, and (optionally) protected attributes.
        2. Splits data into train, validation, and test sets.
        3. Normalizes non-binary features (scaler fitted on training data only).
        4. Wraps the processed data into FairnessDataset instances.

        Args:
            include_protected: If True, protected attributes will be extracted and included.
            
        Returns:
            Tuple of (train_dataset, val_dataset, test_dataset) as FairnessDataset instances.
        """
        aif360_data = self._aif360_dataset_original

        # 1. Extract and validate data
        X_all_np = aif360_data.features.astype(np.float32)
        # This should work because AIF360 datasets are already
        # One-Hot encoded for categorical features. i.e. no strings here! Let's verify.

        y_all_np = aif360_data.labels.ravel().astype(np.float32)
        
        if X_all_np.shape[0] == 0:
            raise ValueError("Dataset contains no samples")
        if X_all_np.shape[1] == 0:
            raise ValueError("Dataset contains no features")
        
        prot_all_np = None
        if include_protected:
            if (hasattr(aif360_data, 'protected_attributes') and 
                aif360_data.protected_attributes is not None and
                aif360_data.protected_attributes.shape[1] > 0):
                prot_all_np = aif360_data.protected_attributes.astype(np.float32)
            else:
                print("Warning: include_protected is True, but protected_attributes are not available or empty.")

        # 2. Create deterministic split indices - SIMPLIFIED APPROACH
        total_size = X_all_np.shape[0]
        train_size = int(TRAIN_SPLIT * total_size)
        val_size = int(VAL_SPLIT * total_size)
        test_size = total_size - train_size - val_size
        
        if train_size == 0:
            raise ValueError("Training set would be empty with current split ratios")
        
        # Generate shuffled indices deterministically
        rng = np.random.RandomState(RANDOM_SEED)
        shuffled_indices = rng.permutation(total_size)
        
        train_indices = shuffled_indices[:train_size]
        val_indices = shuffled_indices[train_size:train_size + val_size]
        test_indices = shuffled_indices[train_size + val_size:]

        # 3. Split data
        X_train_np = X_all_np[train_indices]
        X_val_np = X_all_np[val_indices] if val_size > 0 else np.array([]).reshape(0, X_all_np.shape[1])
        X_test_np = X_all_np[test_indices] if test_size > 0 else np.array([]).reshape(0, X_all_np.shape[1])
        
        y_train_np = y_all_np[train_indices]
        y_val_np = y_all_np[val_indices] if val_size > 0 else np.array([])
        y_test_np = y_all_np[test_indices] if test_size > 0 else np.array([])
        
        prot_train_np = prot_val_np = prot_test_np = None
        if prot_all_np is not None:
            prot_train_np = prot_all_np[train_indices]
            prot_val_np = prot_all_np[val_indices] if val_size > 0 else np.array([]).reshape(0, prot_all_np.shape[1])
            prot_test_np = prot_all_np[test_indices] if test_size > 0 else np.array([]).reshape(0, prot_all_np.shape[1])

        # 4. Scale features (sklearn handles binary features gracefully)
        scaler = StandardScaler()
        X_train_np = scaler.fit_transform(X_train_np)
        if val_size > 0:
            X_val_np = scaler.transform(X_val_np)
        if test_size > 0:
            X_test_np = scaler.transform(X_test_np)

        # 5. Create FairnessDataset instances
        train_dataset = FairnessDataset(X_train_np, y_train_np, prot_train_np if include_protected else None)
        val_dataset = FairnessDataset(X_val_np, y_val_np, prot_val_np if include_protected else None)
        test_dataset = FairnessDataset(X_test_np, y_test_np, prot_test_np if include_protected else None)
        
        print(f"Dataset sizes - Train: {len(train_dataset)}, Val: {len(val_dataset)}, Test: {len(test_dataset)}")
        
        return train_dataset, val_dataset, test_dataset

In [2]:
import os
from folktables import ACSDataSource, ACSIncome

from aif360.datasets import StandardDataset

# Fixed data path as per your notebook
FOLKTABLES_DATA_PATH = "/vol/bitbucket/hh2721/folktables"

class ACSIncomeDataset(BaseDataset):
    """
    Simplified ACS Income Dataset class for loading and preprocessing data from Folktables.

    This class is configured for:
    - Survey Year: 2018
    - Horizon: 1-Year
    - Survey: Person
    - Sensitive Attribute: Sex (Male privileged)
    - Data Path: /vol/bitbucket/hh2721/folktables
    - Download Data: False (assumes data is pre-downloaded)

    It handles loading ACS Income data and applying standard preprocessing steps
    like one-hot encoding via AIF360's StandardDataset.

    Folktables features for ACSIncome:
    ['AGEP', 'COW', 'SCHL', 'MAR', 'OCCP', 'POBP', 'RELP', 'WKHP', 'SEX', 'RAC1P']
    Target: PINCP (Income) > $50,000 (binary)
    Protected Attribute: SEX (1 for Male, 2 for Female. Privileged is Male (1.0)).
    """

    _STATES = ["CA"]
    _SURVEY_YEAR = '2018'
    _HORIZON = '1-Year'
    _SURVEY = 'person'
    _SENSITIVE_ATTRIBUTE_NAME = 'SEX' # Folktables column name
    _DOWNLOAD_DATA = False

    def __init__(self):
        if not os.path.exists(FOLKTABLES_DATA_PATH):
            # If data is not downloaded and download is False, this will likely fail later.
            # Consider adding a more robust check or instruction.
            print(f"Warning: Folktables data path {FOLKTABLES_DATA_PATH} does not exist. "
                  "Data loading might fail as download_data is False.")
        
        self.features_dim = None # Will be populated after load_data

        super().__init__() # Calls BaseDataset.__init__ -> self.load_data()
        
        if self._aif360_dataset_original:
            self.features_dim = self._aif360_dataset_original.features.shape[1]

    def load_data(self) -> StandardDataset:
        """
        Loads and preprocesses ACS Income data using Folktables and AIF360 StandardDataset
        with fixed configuration (2018, 1-Year, 'sex' as SA).
        """
        data_source = ACSDataSource(survey_year=self._SURVEY_YEAR,
                                    horizon=self._HORIZON,
                                    survey=self._SURVEY,
                                    root_dir=FOLKTABLES_DATA_PATH)
        
        try:
            acs_data = data_source.get_data(states=self._STATES, download=self._DOWNLOAD_DATA)
        except Exception as e:
            print(f"Error loading Folktables data: {e}")
            print(f"Please ensure data for states {self._STATES}, year {self._SURVEY_YEAR}, horizon {self._HORIZON} "
                  f"is available at {FOLKTABLES_DATA_PATH} (download is set to False).")
            raise

        features_df, labels_series, _ = ACSIncome.df_to_pandas(acs_data)
        
        df = features_df.copy()
        # We need to map from Folktables to AIF360's expected format:
        # Folktables uses 1 for Male and 2 for Female but AIF360 expects
        # 1.0 for privileged and 0.0 for unprivileged. Since this is a numeric column it isn't mapped
        # automatically by AIF360, we need to do it manually.
        df[self._SENSITIVE_ATTRIBUTE_NAME] = df[self._SENSITIVE_ATTRIBUTE_NAME].map({1: 1.0, 2: 0.0})

        label_name = 'PINCP_GT_50K'
        df[label_name] = labels_series.astype(float)
        favorable_classes = [1.0]

        # Protected attribute is 'SEX' (1.0=Male, 0.0=Female) since we mapped it above.
        # Male (1.0) is privileged.
        protected_attribute_names = [self._SENSITIVE_ATTRIBUTE_NAME]
        privileged_classes = [[1.0]] 

        # ACSIncome.features: ['AGEP', 'COW', 'SCHL', 'MAR', 'OCCP', 'POBP', 'RELP', 'WKHP', 'SEX', 'RAC1P']
        # 'SEX' is the protected attribute. 'RAC1P' will be a regular categorical feature.
        categorical_features_for_encoding = ['COW', 'SCHL', 'MAR', 'OCCP', 'POBP', 'RELP', 'RAC1P']
        
        # Ensure all columns exist
        all_expected_cols = categorical_features_for_encoding + protected_attribute_names + ['AGEP', 'WKHP']
        for col in all_expected_cols:
            if col not in df.columns:
                raise ValueError(f"Expected column '{col}' not found in ACSIncome features for the given configuration.")

        default_metadata = self._get_default_metadata()

        dataset = StandardDataset(
            df=df,
            label_name=label_name,
            favorable_classes=favorable_classes,
            protected_attribute_names=protected_attribute_names,
            privileged_classes=privileged_classes,
            instance_weights_name=None,
            categorical_features=categorical_features_for_encoding,
            features_to_keep=[], 
            features_to_drop=[],
            na_values=[], 
            custom_preprocessing=None, 
            metadata=default_metadata
        )
        
        return dataset

    def _get_default_metadata(self):
        """Provides default metadata for labels and the 'sex' protected attribute."""
        label_map = [{1.0: 'Income > $50K', 0.0: 'Income <= $50K'}]
        # For 'SEX': Male (1.0) is privileged. Female (2.0) is unprivileged.
        # AIF360's StandardDataset handles numeric protected attributes by keeping original values
        # but using privileged_classes to define groups. The metadata map reflects the
        # conceptual 0/1 mapping for fairness metrics.
        protected_attribute_map = [{1.0: 'Male', 0.0: 'Female'}] 

        return {
            'label_maps': label_map,
            'protected_attribute_maps': protected_attribute_map
        }


In [3]:
dataset = ACSIncomeDataset()

In [7]:
train, val, test = dataset.to_torch(include_protected=True)

for feat, lab, prot in train:
    print(f"Feature shape: {feat.shape}, Label: {lab.item()}, Protected: {prot.item()}")
    break  # Just print the first item for brevity

Dataset sizes - Train: 136965, Val: 29349, Test: 29351
Feature shape: torch.Size([815]), Label: 1.0, Protected: 0.0
