In [3]:
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import joblib
import hydra

def load_sim_mat_cali(config_path:str="../config", config_name:str="main")->dict:
    """Load the similarity matrix for calibration."""
    with hydra.initialize(version_base=None, config_path=config_path):
        cfg = hydra.compose(config_name=config_name)
        cfg_dataset = cfg["KITTI"]
        return joblib.load(
                Path(
                    cfg_dataset.paths.save_path,
                    f"sim_mat_cali_{cfg_dataset.retrieval_dim}_{cfg_dataset.mask_ratio}.pkl",
                )
                )

def overlap_ratio(hist1:np.ndarray, hist2:np.ndarray)->float:
    """Computes the overlap ratio between two histograms."""
    minima = np.minimum(hist1, hist2)
    intersection = np.sum(minima)
    union = np.sum(hist1) + np.sum(hist2) - intersection
    return intersection / union

def get_scores_and_gt(data: dict, idx_modal1: int, idx_modal2: int) -> tuple[list[float], list[float]]:
    """Get the scores and ground truth for the given modalities."""
    scores = []
    gt = []
    for mat, label in data.values():
        scores.append(mat[idx_modal1][idx_modal2])
        gt.append(label)
    return scores, gt

IMAGE = 0
LIDAR = 1
TEXT = 2

bin_edges = np.array(range(51))/25-1
modalities = ["Image", "Lidar", "Text"]

data = load_sim_mat_cali()

### Calibration (2 stages)

In [4]:
from mmda.utils.spearman_utils import spearman_rank_coefficient
from mmda.utils.calibrate import (
    get_non_conformity_scores_1st_stage, get_non_conformity_scores_2nd_stage, calibrate, con_mat_calibrate,
)

ns_scores3x3 = {}
for i in range(3):
    for j in range(3):
        nc_scores, c_scores, q2scores = get_non_conformity_scores_1st_stage(data, i, j, top_k=5)
        ns_scores3x3[(i, j)] = nc_scores
print(ns_scores3x3)
con_mat = con_mat_calibrate(data, ns_scores3x3)
nc_scores, c_scores, q2scores = get_non_conformity_scores_2nd_stage(con_mat, np.mean, top_k=5)
print(len(nc_scores), len(c_scores))
scores, gt = [q2scores[q][0] for q in q2scores], [q2scores[q][1] for q in q2scores]
cali_scores = []
for score in scores:
    cali_scores.append(calibrate(score, nc_scores))

{(0, 0): [0.7258245944976807, 0.7668759226799011, 0.7798783779144287, 0.783818244934082, 0.7839109301567078, 0.7849079370498657, 0.7867660522460938, 0.79095458984375, 0.792096734046936, 0.7923640012741089, 0.7981851696968079, 0.7996464371681213, 0.8084635734558105, 0.8107781410217285, 0.8109869956970215, 0.813892126083374, 0.814327597618103, 0.8159152269363403, 0.816098153591156, 0.8177717924118042, 0.8178407549858093, 0.8199485540390015, 0.8203856348991394, 0.8205891847610474, 0.8240191340446472, 0.8256820440292358, 0.8260919451713562, 0.8267017006874084, 0.8289008736610413, 0.8293118476867676, 0.8303649425506592, 0.8308547735214233, 0.8325235843658447, 0.8336420655250549, 0.8341978788375854, 0.8349669575691223, 0.8349669575691223, 0.8361625671386719, 0.8366184830665588, 0.8397032022476196, 0.8398536443710327, 0.8399735689163208, 0.8402665853500366, 0.8409099578857422, 0.8415349721908569, 0.8416256308555603, 0.842055082321167, 0.8444162607192993, 0.8448159098625183, 0.8448976874351501

Calculating conformal probabilities:   0%|          | 2193/602253 [00:00<00:06, 89840.19it/s]


AssertionError: similarity matrix should be in [0, 1], but got 1.0000001192092896 for (2, 2)

In [None]:
print(data[(1778, 1778)])
print(con_mat[(1778, 1778)])

In [None]:
print(np.mean(cali_scores))
bins = np.linspace(0, 1, 20)
bin_indices = np.digitize(cali_scores, bins)
bin_scores = []
bin_gt = []
for k in range(1, len(bins)):
    bin_mask = bin_indices == k
    bin_scores.append(np.mean(np.array(cali_scores)[bin_mask]))
    bin_gt.append(np.mean(np.array(gt)[bin_mask]))
plt.figure()
plt.plot([0,1], [0,1], label="Ideal Calibration Curve")
plt.plot(bin_scores, bin_gt, label="Overall Calibration")
plt.xlabel("Calibration Score")
plt.ylabel("Probability of Correct Retrieval")
plt.legend()
plt.show()

plt.figure()
nc_fig = plt.hist(nc_scores, bins=bin_edges, fill=None, edgecolor="black", density=True, label="Non-Conformity")
c_fig = plt.hist(c_scores, bins=bin_edges, fill=None, edgecolor="orange", density=True, label="Conformity")
plt.xlabel("Scores")
plt.ylabel("Proportion")
plt.xlim(-1,1)
plt.legend()
plt.plot()

In [None]:
# plot the calibration curve
# x axis: calibration score
# y axis: percentage of data points in the bin
for i in range(3):
    for j in range(i, 3):
        nc_scores, c_scores, q2scores = get_non_conformity_scores_1st_stage(data, i, j, top_k=5)

        print(len(nc_scores), len(c_scores))
        scores, gt = [q2scores[q][0] for q in q2scores], [q2scores[q][1] for q in q2scores]
        cali_scores = []
        for score in scores:
            cali_scores.append(calibrate(score, nc_scores))

        # combine score to bins and gt to bins
        bins = np.linspace(0, 1, 20)
        bin_indices = np.digitize(cali_scores, bins)
        bin_scores = []
        bin_gt = []
        for k in range(1, len(bins)):
            bin_mask = bin_indices == k
            bin_scores.append(np.mean(np.array(cali_scores)[bin_mask]))
            bin_gt.append(np.mean(np.array(gt)[bin_mask]))
        plt.figure()
        plt.plot([0,1], [0,1], label="Ideal Calibration Curve")
        plt.plot(bin_scores, bin_gt, label=modalities[i] + "-" + modalities[j])
        plt.xlabel("Calibration Score")
        plt.ylabel("Probability of Correct Retrieval")
        plt.legend()
        plt.show()

### NC C Distributions Overlap Ratio

In [None]:
for i in range(3):
    for j in range(i, 3):
        plt.figure()
        nc_scores, c_scores,_ = get_non_conformity_scores_1st_stage(data, i, j)
        print(len(nc_scores), len(c_scores))
        nc_fig = plt.hist(nc_scores, bins=bin_edges, fill=None, edgecolor="black", density=True, \
                        label=modalities[i] + "-" + modalities[j]+" Non-Conformity")
        c_fig = plt.hist(c_scores, bins=bin_edges, fill=None, edgecolor="orange", density=True, \
                        label=modalities[i] + "-" + modalities[j]+" Conformity")
        plt.xlabel("Scores")
        plt.ylabel("Proportion")
        plt.xlim(-1,1)
        plt.legend()
        plt.plot()

        print(modalities[i] + "-" + modalities[j], " Overlap Ratio: ", overlap_ratio(nc_fig[0], c_fig[0]))

### Nonconformity distributions

In [None]:
nc_scores_0, _, _ = get_non_conformity_scores_1st_stage(data, IMAGE, IMAGE)
plt.hist(nc_scores_0, bins=50, fill=None, edgecolor="black", density=True, label="Image-Image")

nc_scores_1, _, _ = get_non_conformity_scores_1st_stage(data, LIDAR, LIDAR)
plt.hist(nc_scores_1, bins=50, fill=None, edgecolor="blue", density=True, label = "Lidar-Lidar")

nc_scores, _, _ = get_non_conformity_scores_1st_stage(data, IMAGE, LIDAR)
plt.hist(nc_scores, bins=50, fill=None, edgecolor="orange", density=True, label = "Image-Lidar")

plt.xlabel("Nonconformity Scores")
plt.ylabel("Proportion")
plt.xlim(-1,1)
plt.legend()

In [None]:
nc_scores_22, c_scores_22, _ = get_non_conformity_scores_1st_stage(data, TEXT, TEXT)
plt.hist(nc_scores_22, bins=50, fill=None, edgecolor="brown", density=True, label = "Text-Text")
plt.xlabel("Nonconformity Scores")
plt.ylabel("Proportion")
plt.legend(title="Similarity of negative pairs")
plt.legend()

In [None]:
nc_scores_33, c_scores_33, _ = get_non_conformity_scores_1st_stage(data, IMAGE, TEXT)
plt.hist(nc_scores_33, bins=50, fill=None, edgecolor="red", density=True, label="Image-Text")

nc_scores_44, c_scores_44, _ = get_non_conformity_scores_1st_stage(data, LIDAR, TEXT)
plt.hist(nc_scores_44, bins=50, fill=None, edgecolor="green", density=True, label="Lidar-Text")

plt.xlabel("Nonconformity Scores")
plt.ylabel("Proportion")
plt.legend()

In [None]:
def correlation(data: dict, modalities1: tuple[int, int] = (0, 0), modalities2: tuple[int, int] = (1, 1)) -> np.ndarray:
    """Compute the correlation between two modalities."""
    m1, m2 = [], []
    for k in data:
        mat, label = data[k]
        m1.append(mat[modalities1[0]][modalities1[1]])
        m2.append(mat[modalities2[0]][modalities2[1]])
    return np.corrcoef(m1, m2)

modalities = ["Image", "Lidar", "Text"]

for i in range(3):
    for j in range(i, 3):
        for k in range(j, 3):
            for l in range(k, 3): # noqa: E741
                corr = correlation(data, modalities1=(i,j), modalities2=(k,l))[0][1]
                print(modalities[i], "-", modalities[j], " ", modalities[k], "-", modalities[l], " ", corr)
                scores = [data[_][0][i][j] for _ in data]
                scores_k = [data[_][0][k][l] for _ in data]
                r, _, _ = spearman_rank_coefficient(np.array(scores), np.array(scores_k))
                print(modalities[i], "-", modalities[j], " ", modalities[k], "-", modalities[l], " ", r)