In [1]:
import numpy as np
import torch as th
from sklearn.metrics import f1_score, roc_auc_score

# Define metrics functions based on your provided code
def expected_f1_score(target, pred, check_acyclic=False):
    f1_all = np.zeros(pred.shape[1])
    for i in range(pred.shape[1]):
        curr_pred = pred[:, i]
        curr_target = target[i]
        f1_sample = []
        for j in range(curr_pred.shape[0]):
            curr_sample_pred = curr_pred[j]
            f1 = f1_score(
                curr_target.flatten(),
                curr_sample_pred.flatten(),
                average="binary",
                zero_division=0,
            )
            f1_sample.append(f1)
        f1_all[i] = np.mean(f1_sample)
    return f1_all

def log_prob_graph_scores(targets, preds):
    all_log_probs = []
    for batch_idx in range(targets.shape[0]):
        sample_mean = th.mean(preds[:, batch_idx], axis=0)
        sample_mean_flatten = sample_mean.flatten()
        current_batch = targets[batch_idx]
        current_batch_flatten = current_batch.flatten()

        bern_dist = th.distributions.bernoulli.Bernoulli(probs=sample_mean_flatten)
        log_prob = bern_dist.log_prob(current_batch_flatten).sum()
        all_log_probs.append(log_prob.cpu().item())
    return all_log_probs

def auc_graph_scores(targets, preds):
    if isinstance(targets, th.Tensor):
        targets = targets.cpu().numpy()
    if isinstance(preds, th.Tensor):
        preds = preds.cpu().numpy()

    all_aucs = []
    for batch_idx in range(targets.shape[0]):
        sample_mean = np.mean(preds[:, batch_idx], axis=0)
        sample_mean_flatten = sample_mean.flatten()
        current_batch = targets[batch_idx]
        current_batch_flatten = current_batch.flatten()
        auc = roc_auc_score(current_batch_flatten, sample_mean_flatten, average="macro")
        all_aucs.append(auc)
    return all_aucs

# Generate 50% [[0, 1], [0, 0]] and 50% [[0, 0], [1, 0]] for 100 batches
batch_size = 100
target_1 = np.array([[0, 1], [0, 0]])
target_2 = np.array([[0, 0], [1, 0]])
targets = np.array([target_1 if i % 2 == 0 else target_2 for i in range(batch_size)])

# Generate samples from Bernoulli[[0, 0.5], [0.5, 0]]
preds_bernoulli_probs = np.array([[0, 0.5], [0.5, 0]])
num_samples = 500
preds = np.random.binomial(n=1, p=preds_bernoulli_probs, size=(num_samples, batch_size, 2, 2))

# Convert targets and preds to torch tensors
targets_th = th.tensor(targets, dtype=th.float32)
preds_th = th.tensor(preds, dtype=th.float32)

# Evaluate the metrics
expected_f1 = expected_f1_score(targets, preds)
log_probs = log_prob_graph_scores(targets_th, preds_th)
auc_scores = auc_graph_scores(targets_th, preds_th)

import ace_tools as tools; tools.display_dataframe_to_user(name="Metrics Evaluation", dataframe={
    "Expected F1": expected_f1,
    "Log Probability": log_probs,
    "AUC Scores": auc_scores
})
