In [2]:
import numpy as np
import cvxpy as cp

In [None]:
# take some random return data
R = np.random.randn(2500, 100)

n, m = R.shape

alpha = 0.95

entropy_weights = np.random.rand(n)
entropy_weights /= np.sum(entropy_weights)

gamma = cp.Variable()
w = cp.Variable(m)
constraints = [w >= 0, cp.sum(w) == 1]

obj = cp.Minimize(cp.sum(cp.multiply(entropy_weights, cp.pos(R @ w - gamma))))
problem = cp.Problem(objective=obj, constraints=constraints)
problem.solve()

print(f"CVaR: {gamma.value}")

In [None]:
def CVaR(
    losses: np.ndarray, subgroups: list, p: float, alpha: float
) -> float:
    """Conditional Value at Risk / superquantile
    Loss aggregator, analogous to E[X | X > alpha-quantile]

    Set p=0, alpha=0 for the expected value

    :param subgroup_losses: losses per subgroup to aggregate over
    :param p: discard all subgroup risks lower then p
    :param alpha: degree of fairness
    :return: aggregated losses
    """
    subgroup_losses = [losses[mask] for mask in subgroups]

    # compute the subgroup mean losses
    subgroup_mean_losses = np.zeros(len(subgroup_losses))
    for i, subgroup_loss in enumerate(subgroup_losses):
        # calculate the expected loss over the subgroup
        mean = np.mean(subgroup_loss - p) ** 2
        subgroup_mean_losses[i] = mean


    return p + 1 / (1 - alpha) * np.mean(subgroup_mean_losses)


# random 2d array of losses in range [0, 1]
losses = np.random.rand(100, 100)

CVaR

np.float64(5.0800694720222825)

In [20]:
import numpy as np
import cvxpy as cp
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Generate a synthetic binary classification dataset
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10, n_redundant=10, random_state=42)
S = np.random.randint(0, 2, size=(1000,))  # Sensitive attribute (binary)

# Split the dataset
X_train, X_test, y_train, y_test, S_train, S_test = train_test_split(X, y, S, test_size=0.3, random_state=42)

# Define the classifier weights as a CVXPY variable
w = cp.Variable(X_train.shape[1])

# Define the loss function (e.g., logistic loss)
def logistic_loss(X, y, w):
    return cp.sum(cp.logistic(-cp.multiply(y, X @ w))) / X.shape[0]

# Define CVaR objective for fairness
alpha = 0.95  # CVaR confidence level
rho = cp.Variable()  # CVaR risk threshold variable

# Calculate losses for each subgroup
loss_S0 = cp.multiply((S_train == 0) * 1, logistic_loss(X_train, y_train, w))
loss_S1 = cp.multiply((S_train == 1) * 1, logistic_loss(X_train, y_train, w))

# Define CVaR constraints for fairness-aware optimization
cvar_constraint_0 = cp.sum(cp.maximum(loss_S0 - rho, 0)) / ((1 - alpha) * (S_train == 0).sum())
cvar_constraint_1 = cp.sum(cp.maximum(loss_S1 - rho, 0)) / ((1 - alpha) * (S_train == 1).sum())

# Objective function
objective = cp.Minimize(cp.sum(logistic_loss(X_train, y_train, w)) + cvar_constraint_0 + cvar_constraint_1)

# Set up and solve the optimization problem
constraints = [rho >= 0]
prob = cp.Problem(objective, constraints)
prob.solve()

# Evaluate the model
y_pred = (X_test @ w.value > 0).astype(int)
accuracy = accuracy_score(y_test, y_pred)

print("Fair model accuracy:", accuracy)

Fair model accuracy: 0.7066666666666667


In [120]:
def constrained_maxmin_user_given_item(rel_matrix: np.ndarray, k_rec: int, v: float) -> cp.Problem:
    x_alloc = cp.Variable(rel_matrix.shape, boolean=True)

    # constraints
    constraints = [
        # recommend k items
        cp.sum(x_alloc, axis=1) == k_rec,
        # minimal item utility must be at least v
        cp.min(cp.sum(x_alloc, axis=0)) >= v,
    ]

    # maximize the minimal user utility
    problem = cp.Problem(
        cp.Maximize(cp.mean(cp.sum(cp.multiply(x_alloc, rel_matrix), axis=1))),
        constraints,
    )
    problem.solve(solver=cp.SCIP)

    return problem

def constrained_maxmin_item_given_user(rel_matrix: np.ndarray, k_rec: int) -> cp.Problem:
    x_alloc = cp.Variable(rel_matrix.shape, boolean=True)

    # constraints
    constraints = [
        # recommend k items
        cp.sum(x_alloc, axis=1) == k_rec,
    ]

    # maximize the minimal item utility
    problem = cp.Problem(
        cp.Maximize(cp.min(cp.sum(x_alloc, axis=0))),
        constraints,
    )
    problem.solve(solver=cp.SCIP)

    return problem

In [34]:
# sample 100 rows and 10 columns from predictions matrix

with open("predictions.npy", "rb") as f:
    predictions = np.load(f)

small_pred = predictions[:100, :10]
random_groups = np.random.randint(0, 5, size=(100,))

In [4]:
problem = constrained_maxmin_user_given_item(small_pred, 10, 1)
all_weights = problem.variables()[0].value
utility = small_pred * all_weights
utility.sum(axis=1).std()

NameError: name 'constrained_maxmin_user_given_item' is not defined

In [32]:
def constrained_maxmin_user_given_item(rel_matrix: np.ndarray, groups: list[int], k_rec: int, v: float) -> cp.Problem:
    x_alloc = cp.Variable(rel_matrix.shape, boolean=True)
    best_alloc = np.sort(rel_matrix, axis=1)[:, -k_rec:]
    best_alloc = np.sum(best_alloc, axis=1)

    alpha = 0.95  # CVaR confidence level
    rho = cp.Variable()  # CVaR risk threshold variable

    # constraints
    constraints = [
        # recommend k items
        cp.sum(x_alloc, axis=1) == k_rec,
        # minimal item utility must be at least v
        cp.min(cp.sum(x_alloc, axis=0)) >= v,
        rho >= 0,
    ]

    group_gains = []
    for group in set(groups):
        group_mask = (groups == group) * 1
        best_group_alloc = cp.multiply(group_mask, best_alloc)
        group_alloc = cp.multiply(group_mask, cp.sum(cp.multiply(x_alloc, rel_matrix), axis=1))
        group_loss = best_group_alloc - group_alloc
        group_gains.append((rho + cp.sum(cp.maximum(group_loss - rho, 0))) / ((1 - alpha) * group_mask.sum()))

    objective = cp.Minimize(cp.sum(group_gains))

    # maximize the minimal user utility
    problem = cp.Problem(
        objective=objective,
        constraints=constraints
    )
    problem.solve(solver=cp.SCIP)

    return problem

In [62]:
problem.variables()[1].value

In [40]:
problem = constrained_maxmin_user_given_item(small_pred, random_groups, 5, 1)
problem.variables()[1].value

array([[ 1.00000000e+00,  1.00000000e+00,  1.00000000e+00,
         1.00000000e+00,  1.00000000e+00,  0.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00],
       [ 0.00000000e+00,  1.00000000e+00,  0.00000000e+00,
         1.00000000e+00,  8.99313606e-15,  0.00000000e+00,
         1.00000000e+00,  1.00000000e+00,  1.00000000e+00,
         0.00000000e+00],
       [ 1.00000000e+00,  1.78672542e-14,  0.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         1.00000000e+00,  1.00000000e+00,  1.00000000e+00,
         1.00000000e+00],
       [ 1.00000000e+00,  2.11819374e-13,  0.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         1.00000000e+00,  1.00000000e+00,  1.00000000e+00,
         1.00000000e+00],
       [ 0.00000000e+00,  0.00000000e+00,  1.00000000e+00,
         0.00000000e+00,  5.95973487e-15,  0.00000000e+00,
         1.00000000e+00,  1.00000000e+00,  1.00000000e+00,
         1.