In [1]:
import sys
sys.path.append('../')

import logging
import os.path as op

import torch
import torch.nn.functional as F
import numpy as np
import pandas as pd

from torchmetrics.functional.classification import binary_calibration_error
from sklearn.metrics import (
    roc_auc_score,
    mean_squared_error,
    mean_absolute_error,
    precision_recall_curve,
    auc,
    brier_score_loss,
)
from scipy.stats import norm as gaussian

from muben.utils.io import set_logging, init_dir, load_results
logger = logging.getLogger(__name__)

In [6]:
def aggregate_seeded_results(results_for_seeds: list[dict[str, float]]):
    assert results_for_seeds

    results_aggr = {
        metric: [r.get(metric, np.NaN) for r in results_for_seeds]
        for metric in list(results_for_seeds[0].keys())
    }
    for k in results_aggr:
        mean = np.nanmean(results_aggr[k])
        std = np.nanstd(results_aggr[k])
        results_aggr[k] = {"mean": mean, "std": std}

    return results_aggr


def save_results(results, result_dir, model_name, dataset_name):
    if not results:
        logger.warning("Result dict is empty. No results is saved!")
        return None

    uncertainty_names = list(results.keys())
    metrics = list(list(results.values())[0].keys())
    columns_headers = (
        ["method"]
        + [f"{metric}-mean" for metric in metrics]
        + [f"{metric}-std" for metric in metrics]
    )
    columns = {k: list() for k in columns_headers}
    columns["method"] = [f"{model_name}-{un}" for un in uncertainty_names]
    for uncertainty in uncertainty_names:
        metric_dict = results[uncertainty]
        for metric in metrics:
            value = metric_dict.get(metric, None)
            if value:
                columns[f"{metric}-mean"].append(value["mean"])
                columns[f"{metric}-std"].append(value["std"])
            else:
                columns[f"{metric}-mean"].append(np.NaN)
                columns[f"{metric}-std"].append(np.NaN)

    df = pd.DataFrame(columns)
    init_dir(op.join(result_dir, "RESULTS", "scores"), clear_original_content=False)
    df.to_csv(
        op.join(result_dir, "RESULTS", "scores", f"{model_name}-{dataset_name}.csv")
    )
    return None


def classification_metrics(preds, lbs, masks):
    result_metrics_dict = dict()

    roc_auc_list = list()
    prc_auc_list = list()
    ece_list = list()
    mce_list = list()
    nll_list = list()
    brier_list = list()

    roc_auc_valid_flag = True
    prc_auc_valid_flag = True
    ece_valid_flag = True
    mce_valid_flag = True
    nll_valid_flag = True
    brier_valid_flag = True

    for i in range(lbs.shape[-1]):
        lbs_ = lbs[:, i][masks[:, i].astype(bool)]
        preds_ = preds[:, i][masks[:, i].astype(bool)]

        if len(lbs_) < 1:
            continue
        if (lbs_ < 0).any():
            raise ValueError("Invalid label value encountered!")
        if (lbs_ == 0).all() or (
            lbs_ == 1
        ).all():  # skip tasks with only one label type, as Uni-Mol did.
            continue

        # --- roc-auc ---
        try:
            roc_auc = roc_auc_score(lbs_, preds_)
            roc_auc_list.append(roc_auc)
        except:
            roc_auc_valid_flag = False

        # --- prc-auc ---
        try:
            p, r, _ = precision_recall_curve(lbs_, preds_)
            prc_auc = auc(r, p)
            prc_auc_list.append(prc_auc)
        except:
            prc_auc_valid_flag = False

        # --- ece ---
        try:
            ece = binary_calibration_error(
                torch.from_numpy(preds_), torch.from_numpy(lbs_)
            ).item()
            ece_list.append(ece)
        except:
            ece_valid_flag = False

        # --- mce ---
        try:
            mce = binary_calibration_error(
                torch.from_numpy(preds_), torch.from_numpy(lbs_), norm="max"
            ).item()
            mce_list.append(mce)
        except:
            mce_valid_flag = False

        # --- nll ---
        try:
            nll = F.binary_cross_entropy(
                input=torch.from_numpy(preds_),
                target=torch.from_numpy(lbs_).to(torch.float),
                reduction="mean",
            ).item()
            nll_list.append(nll)
        except:
            nll_valid_flag = False

        # --- brier ---
        try:
            brier = brier_score_loss(lbs_, preds_)
            brier_list.append(brier)
        except:
            brier_valid_flag = False

    if roc_auc_valid_flag:
        roc_auc_avg = np.mean(roc_auc_list)
        result_metrics_dict["roc-auc"] = {"all": roc_auc_list, "macro-avg": roc_auc_avg}

    if prc_auc_valid_flag:
        prc_auc_avg = np.mean(prc_auc_list)
        result_metrics_dict["prc-auc"] = {"all": prc_auc_list, "macro-avg": prc_auc_avg}

    if ece_valid_flag:
        ece_avg = np.mean(ece_list)
        result_metrics_dict["ece"] = {"all": ece_list, "macro-avg": ece_avg}

    if mce_valid_flag:
        mce_avg = np.mean(mce_list)
        result_metrics_dict["mce"] = {"all": mce_list, "macro-avg": mce_avg}

    if nll_valid_flag:
        nll_avg = np.mean(nll_list)
        result_metrics_dict["nll"] = {"all": nll_list, "macro-avg": nll_avg}

    if brier_valid_flag:
        brier_avg = np.mean(brier_list)
        result_metrics_dict["brier"] = {"all": brier_list, "macro-avg": brier_avg}

    return result_metrics_dict


def regression_metrics(preds, variances, lbs, masks):
    if len(preds.shape) == 1:
        preds = preds[:, np.newaxis]

    if len(variances.shape) == 1:
        variances = variances[:, np.newaxis]

    # --- rmse ---
    result_metrics_dict = dict()

    rmse_list = list()
    mae_list = list()
    nll_list = list()
    ce_list = list()

    for i in range(lbs.shape[-1]):
        lbs_ = lbs[:, i][masks[:, i].astype(bool)]
        preds_ = preds[:, i][masks[:, i].astype(bool)]
        vars_ = variances[:, i][masks[:, i].astype(bool)]

        # --- rmse ---
        rmse = mean_squared_error(lbs_, preds_, squared=False)
        rmse_list.append(rmse)

        # --- mae ---
        mae = mean_absolute_error(lbs_, preds_)
        mae_list.append(mae)

        # --- Gaussian NLL ---
        nll = F.gaussian_nll_loss(
            torch.from_numpy(preds_), torch.from_numpy(lbs_), torch.from_numpy(vars_)
        ).item()
        nll_list.append(nll)

        # --- calibration error ---
        ce = regression_calibration_error(lbs_, preds_, vars_)
        ce_list.append(ce)

    rmse_avg = np.mean(rmse_list)
    result_metrics_dict["rmse"] = {"all": rmse_list, "macro-avg": rmse_avg}

    mae_avg = np.mean(mae_list)
    result_metrics_dict["mae"] = {"all": mae_list, "macro-avg": mae_avg}

    nll_avg = np.mean(nll_list)
    result_metrics_dict["nll"] = {"all": nll_list, "macro-avg": nll_avg}

    ce_avg = np.mean(ce_list)
    result_metrics_dict["ce"] = {"all": ce_list, "macro-avg": ce_avg}

    return result_metrics_dict


def regression_calibration_error(lbs, preds, variances, n_bins=20):
    sigma = np.sqrt(variances)
    phi_lbs = gaussian.cdf(lbs, loc=preds.reshape(-1, 1), scale=sigma.reshape(-1, 1))

    expected_confidence = np.linspace(0, 1, n_bins + 1)[1:-1]
    observed_confidence = np.zeros_like(expected_confidence)

    for i in range(0, len(expected_confidence)):
        observed_confidence[i] = np.mean(phi_lbs <= expected_confidence[i])

    calibration_error = np.mean(
        (expected_confidence.ravel() - observed_confidence.ravel()) ** 2
    )

    return calibration_error

def load_results_no_merging(result_paths: list[str]):
    """
    Load muben prediction results.

    Parameters
    ----------
    result_paths : list[str]
        paths to the result files
    """
    lbs = masks = np.nan
    preds_list = list()
    variances_list = list()

    for test_result_path in result_paths:
        results = torch.load(test_result_path)

        if lbs is not np.nan:
            assert (lbs == results["lbs"]).all()
        else:
            lbs: np.ndarray = results["lbs"]

        if masks is not np.nan:
            assert (masks == results["masks"]).all()
        else:
            masks: np.ndarray = results["masks"]

        if results.get("version", 1) == 1:
            preds_list.append(results["preds"]["preds"])
            try:
                variances_list.append(results["preds"]["vars"])
            except KeyError:
                pass
        elif results.get("version", 1) == 2:
            preds_list.append(results["preds"])
            try:
                variances_list.append(results["vars"])
            except KeyError:
                pass
        else:
            raise ValueError(
                f"Undefined result version: {results.get('version', 1)}"
            )

    # aggregate mean and variance
    preds = np.stack(preds_list)
    if (
        variances_list and not (np.asarray(variances_list) == None).any()
    ):  # regression
        # variances = np.mean(np.stack(preds_list) ** 2 + np.stack(variances_list), axis=0) - preds ** 2
        variances = np.stack(variances_list)
    else:
        variances = None

    return preds, variances, lbs, masks

In [3]:
dataset = "lipo"
# dataset = "tox21"
uncertainty = "DeepEnsembles"
# model = "DNN-rdkit"
model = "Uni-Mol"
result_folder = f"../output/{dataset}/{model}/{uncertainty}"
seeds = list(range(10))

results = dict()
for idx in range(len(seeds)):
    seed_ids = seeds[:idx+1]
    test_result_paths = [
        op.join(result_folder, f"seed-{sid}", "preds", "0.pt") for sid in seed_ids
    ]

    preds, variances, lbs, masks = load_results(test_result_paths)

    if variances is not None:  # regression
        metrics = regression_metrics(preds, variances, lbs, masks)
    else:  # classification
        metrics = classification_metrics(preds, lbs, masks)

    result_dict = {k: v["macro-avg"] for k, v in metrics.items()}
    results_aggr = {
        k: v for k, v in result_dict.items()
    }
    results[idx+1] = results_aggr


In [45]:
df = pd.DataFrame.from_dict(results)
df = df.round(3)
print(df.to_markdown())

In [14]:
results = dict()

In [18]:
# dataset = "lipo"
dataset = "tox21"
uncertainty = "DeepEnsembles"
# model = "DNN-rdkit"
model = "Uni-Mol"
result_folder = f"../output/{dataset}/{model}/{uncertainty}"
seeds = list(range(10))

mean_variances_dict = dict()
var_variances_dict = dict()
for idx in range(len(seeds)):
    seed_ids = seeds[:idx+1]
    test_result_paths = [
        op.join(result_folder, f"seed-{sid}", "preds", "0.pt") for sid in seed_ids
    ]

    means, variances, lbs, masks = load_results_no_merging(test_result_paths)
    mean_variances_dict[idx+1] = np.var(means, axis=0).mean()
    if variances is not None:
        var_variances_dict[idx+1] = np.var(variances, axis=0).mean()

results[f"{model}-{dataset}-mean"] = mean_variances_dict
if variances is not None:
    results[f"{model}-{dataset}-var"] = var_variances_dict


In [20]:
df = pd.DataFrame.from_dict(results)
df = df.round(5)
print(df.to_markdown())

|    |   DNN-rdkit-lipo-mean |   DNN-rdkit-lipo-var |   Uni-Mol-lipo-mean |   Uni-Mol-lipo-var |   DNN-rdkit-tox21-mean |   Uni-Mol-tox21-mean |
|---:|----------------------:|---------------------:|--------------------:|-------------------:|-----------------------:|---------------------:|
|  1 |               0       |              0       |             0       |            0       |                0       |              0       |
|  2 |               0.04634 |              0.00051 |             0.01311 |            0.00027 |                0.00287 |              0.00224 |
|  3 |               0.06198 |              0.0008  |             0.0163  |            0.00031 |                0.00345 |              0.00279 |
|  4 |               0.06801 |              0.00124 |             0.01741 |            0.00026 |                0.00357 |              0.00374 |
|  5 |               0.06867 |              0.00255 |             0.02351 |            0.00284 |                0.00356 |         