In [3]:
import numpy as np
import torch
from torch import nn
from typing import Union
import abc
import pandas as pd

Activation = Union[str, nn.Module]
_str_to_activation = {
    'relu': nn.ReLU(),
    'tanh': nn.Tanh(),
    'leaky_relu': nn.LeakyReLU(),
    'sigmoid': nn.Sigmoid(),
    'selu': nn.SELU(),
    'softplus': nn.Softplus(),
    'identity': nn.Identity(),
}

def build_mlp(
        input_size: int,
        output_size: int,
        n_layers: int,
        size: int,
        activation: Activation = 'tanh',
        output_activation: Activation = 'identity',
):
    if isinstance(activation, str):
        activation = _str_to_activation[activation]
    if isinstance(output_activation, str):
        output_activation = _str_to_activation[output_activation]
    layers = []
    in_size = input_size
    for _ in range(n_layers):
        layers.append(nn.Linear(in_size, size))
        layers.append(activation)
        # layers.append(nn.Dropout(p=0.6))
        in_size = size
    layers.append(nn.Linear(in_size, output_size))
    layers.append(output_activation)
    return nn.Sequential(*layers)

device = None

def init_gpu(use_gpu=True, gpu_id=0):
    global device
    if torch.cuda.is_available() and use_gpu:
        device = torch.device("cuda:" + str(gpu_id))
        print("Using GPU id {}".format(gpu_id))
    else:
        device = torch.device("cpu")
        print("GPU not detected. Defaulting to CPU.")


def set_device(gpu_id):
    torch.cuda.set_device(gpu_id)


def from_numpy(*args, **kwargs):
    return torch.from_numpy(*args, **kwargs).float().to(device)


def to_numpy(tensor):
    return tensor.to('cpu').detach().numpy()



class BaseProblem(object, metaclass=abc.ABCMeta):
    """
    BaseProblem defines and solves underlying mathematical optimization problem
    Use this base class to define your own problem
    """
    def __init__(self):
        super(BaseProblem, self).__init__()
        self.num_feats = 0
        self.lancer_out_activation = "relu"

    def build_model(self, **kwargs):
        raise NotImplementedError

    def eval(self, z_pred: np.ndarray, z_true: np.ndarray, **kwargs) -> np.ndarray:
        """
        Evaluates problem specific decision loss: f(x*; c_true),
        where x* is the solution obtained using z_pred
        :param z_pred: predicted problem descriptors
        :param z_true: ground truth problem descriptors
        :param kwargs: additional problem specific arguments
        :return: f_hat_list = list of decision losses, one per datapoint
        """
        raise NotImplementedError

    def get_c_shapes(self):
        raise NotImplementedError

    def get_activations(self):
        return "tanh", "relu"  # hidden layer(s) and output_activation for c_model


In [1]:
def AlphaFairness(util,alpha):
    if alpha == 1:
        return np.sum(np.log(util))
    elif alpha == 0:
        return np.sum(util)
    elif alpha == 'inf':
        return np.min(util)
    else:
        return np.sum(util**(1-alpha)/(1-alpha))

In [2]:
import sys
sys.path.insert(0, '/Users/dennis/Downloads/2024-fall/research/Fairness-Decision-Focused-Loss/The Paper/algorithm')

from myutil import *
from features import get_all_features
from sklearn.preprocessing import StandardScaler

alpha, Q = 2, 20

df = pd.read_csv('data/data.csv')
df = df.sample(n=20, random_state=42)

columns_to_keep = [
    'risk_score_t', 'program_enrolled_t', 'cost_t', 'cost_avoidable_t', 'race', 'dem_female', 'gagne_sum_tm1', 'gagne_sum_t', 
    'risk_score_percentile', 'screening_eligible', 'avoidable_cost_mapped', 'propensity_score', 'g_binary', 
    'g_continuous', 'utility_binary', 'utility_continuous'
]
# for race 0 is white, 1 is black
df_stat = df[columns_to_keep]
df_feature = df[[col for col in df.columns if col not in columns_to_keep]]

# Replace all values less than 0.1 with 0.1
# Define input variables for DFL
feats = df_feature[get_all_features(df_feature)].values
risk = df_stat['risk_score_t'].values.clip(0.001)
gainF = df_stat['g_continuous'].values.clip(0.1)
decision = df_stat['propensity_score'].values
cost = np.ones(risk.shape)
race = df_stat['race'].values

# Transform the features
scaler = StandardScaler()
feats = scaler.fit_transform(feats)
