In [None]:
import numpy as np
import torch
from matplotlib import pyplot as plt

from src.datasets import CFMGraphDataset
from src.loaders import get_test_loader, get_train_loaders
from src.models import GATEncoder
from src.utils import predict, save
from betacal import BetaCalibration
from sklearn.calibration import calibration_curve
from sklearn.isotonic import IsotonicRegression

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
model = GATEncoder(
    d_features=7,
    d_edges=5,
    d_out=24,
    d_hidden_dim=300,
    num_layers=3,
    num_heads=3,
    d_linear_layers=[256],
    dropout=0.01,
    activation="ReLU",
).to(device)

load = "runs/night_test"

In [None]:
labels = {}
probabilities = {}
predictions = {}

train_loader, val_loader = get_train_loaders(
    batch_size=128, shuffle=False, dataset=CFMGraphDataset
)
test_loader = get_test_loader(batch_size=128, dataset=CFMGraphDataset)
is_torch_geometric = True

model.load_state_dict(torch.load(f"{load}/model.pt"))

predictions["train"], labels["train"], probabilities["train"] = predict(
    model, train_loader, device, is_torch_geometric=is_torch_geometric
)
predictions["val"], labels["val"], probabilities["val"] = predict(
    model, val_loader, device, is_torch_geometric=is_torch_geometric
)
predictions["test"], _, probabilities["test"] = predict(
    model, test_loader, device, is_torch_geometric=is_torch_geometric
)

# We don't have labels for the test set
labels["test"] = predictions["test"]

In [None]:
def ploy_accuracy(key):
    for i in range(24):
        plt.bar(
            i,
            np.mean(predictions[key][labels[key] == i] == i),
            color="skyblue",
        )
    plt.title(f"{key} accuracy")
    plt.xticks(range(24))
    plt.show()


ploy_accuracy("train")
ploy_accuracy("val")
ploy_accuracy("test")

In [None]:
def plot_repartition(key):
    plt.hist(predictions[key], bins=24, label=key, density=True)
    plt.legend()
    plt.show()


plot_repartition("train")
plot_repartition("val")
plot_repartition("test")

In [None]:
def plot_error_matrix(key):
    error_matrix = np.zeros((24, 24))
    for i in range(24):
        for j in range(24):
            error_matrix[i, j] = np.mean(predictions[key][labels[key] == i] == j)
    plt.imshow(error_matrix)
    plt.colorbar()
    plt.title(f"{key} error matrix")
    plt.show()


plot_error_matrix("train")
plot_error_matrix("val")
plot_error_matrix("test")

In [None]:
cal_curves = {}


def compute_cal_graph(key, bins=10, strategy="uniform"):
    for i in range(24):
        cal_curves[f"{key}_fraction"], cal_curves[f"{key}_probabilities"] = (
            calibration_curve(
                labels[key] == i,
                probabilities[key][:, i],
                n_bins=bins,
                strategy=strategy,
            )
        )


compute_cal_graph("train")
compute_cal_graph("val")
compute_cal_graph("test")

In [None]:
regressors = {}


def calibrate(regressor, name):
    probabilities[f"train_{name}"] = np.zeros_like(probabilities["train"])
    probabilities[f"val_{name}"] = np.zeros_like(probabilities["val"])
    probabilities[f"test_{name}"] = np.zeros_like(probabilities["test"])

    for i in range(24):
        regressors[f"{name}_{i}"] = regressor
        regressors[f"{name}_{i}"].fit(
            probabilities["train"][:, i].reshape(-1, 1), (labels["train"] == i)
        )

        probabilities[f"train_{name}"][:, i] = regressors[f"{name}_{i}"].predict(
            probabilities["train"][:, i].reshape(-1, 1)
        )
        probabilities[f"val_{name}"][:, i] = regressors[f"{name}_{i}"].predict(
            probabilities["val"][:, i].reshape(-1, 1)
        )
        probabilities[f"test_{name}"][:, i] = regressors[f"{name}_{i}"].predict(
            probabilities["test"][:, i].reshape(-1, 1)
        )

    predictions[f"train_{name}"] = probabilities[f"train_{name}"].argmax(axis=1)
    predictions[f"val_{name}"] = probabilities[f"val_{name}"].argmax(axis=1)
    predictions[f"test_{name}"] = probabilities[f"test_{name}"].argmax(axis=1)

    labels[f"train_{name}"] = labels["train"]
    labels[f"val_{name}"] = labels["val"]
    labels[f"test_{name}"] = predictions[f"test_{name}"]


calibrate(IsotonicRegression(), "isotonic")

compute_cal_graph("train_isotonic")
compute_cal_graph("val_isotonic")
compute_cal_graph("test_isotonic")

In [None]:
def plot_calibration(name, K=3):
    fig, ax = plt.subplots(24 // K, K, figsize=(20, 30))

    for i in range(24):
        ax[i // K, i % K].plot(
            cal_curves[f"train_{name}_probabilities"],
            cal_curves[f"train_{name}_fraction"],
            marker="o",
            label="train",
            color="blue",
        )
        ax[i // K, i % K].plot(
            cal_curves[f"val_{name}_probabilities"],
            cal_curves[f"val_{name}_fraction"],
            marker="o",
            label="val",
            color="orange",
        )
        ax[i // K, i % K].plot(
            cal_curves[f"test_{name}_probabilities"],
            cal_curves[f"test_{name}_fraction"],
            marker="o",
            label="test",
            color="green",
        )

        ax[i // K, i % K].plot(
            cal_curves["train_probabilities"],
            cal_curves["train_fraction"],
            marker="o",
            alpha=0.5,
            color="blue",
        )
        ax[i // K, i % K].plot(
            cal_curves["val_probabilities"],
            cal_curves["val_fraction"],
            marker="o",
            alpha=0.5,
            color="orange",
        )
        ax[i // K, i % K].plot(
            cal_curves["test_probabilities"],
            cal_curves["test_fraction"],
            marker="o",
            alpha=0.5,
            color="green",
        )
        ax[i // K, i % K].plot([0, 1], [0, 1], linestyle="--", color="black")
        ax[i // K, i % K].set_title(f"Class {i+1}")
        ax[i // K, i % K].set_xlabel("Mean predicted probability")
        ax[i // K, i % K].set_ylabel("Fraction of positives")
        ax[i // K, i % K].legend()

    fig.tight_layout()
    plt.show()


plot_calibration("isotonic")

In [None]:
plot_repartition("train_isotonic")
plot_repartition("val_isotonic")
plot_repartition("test_isotonic")

In [None]:
save(
    predictions["test_isotonic"],
)

In [None]:
def adjust_repartition(key):
    p = probabilities[key].copy()
    losses = None
    while losses is None or np.max(losses) > 0.005:
        pred = p.argmax(axis=1)
        repartition = np.bincount(pred, minlength=24) / len(predictions[key])
        directions = np.sign(repartition - 1 / 24)
        losses = np.abs(repartition - 1 / 24)

        p *= 1 - 0.01 * directions * losses

        print(np.max(losses), end="\r")

    probabilities[f"{key}_adjusted"] = p
    predictions[f"{key}_adjusted"] = p.argmax(axis=1)
    labels[f"{key}_adjusted"] = labels[key]


adjust_repartition("test")
plot_repartition("test_adjusted")

In [None]:
save(
    predictions["test_adjusted"],
)