In [None]:
%matplotlib inline
from matplotlib import pyplot as plt
serif = False
render_format = "pdf"
if serif:
    dir_postfix = ""
    plt.rcParams["font.family"] = "serif"
else:
    dir_postfix = "sans"
    plt.rcParams["font.family"] = "Liberation Sans"
plt.rcParams["font.size"] = 10
plt.rcParams["pdf.fonttype"] = 42
plt.rcParams["ps.fonttype"] = 42

from results import *

# Task Progression Failure Analysis: Cover Object and Close Box Domains

## Sentinel Result

In [2]:
# Generate sentinel experiment keys.
vlm_exp_keys_cover = get_vlm_exp_keys(
    models=["claude-3-5-sonnet-20240620"],
    templates = {"claude-3-5-sonnet-20240620": ["video_qa", "video_qa_ref_video", "video_qa_ref_goal"]}
)
vlm_exp_keys_close = get_vlm_exp_keys(
    models=["gpt-4o"],
    templates={"gpt-4o": ["video_qa"]}
)
stac_exp_keys = get_temporal_consistency_exp_keys(
    pred_horizons=[16],
    sample_sizes=[32],
    error_fns=["mmd_rbf_all"],
    aggr_fns=[""],
)

# Load results.
cover_splits = ["na", "ss"]
cover_metrics = compile_metrics(
    domain="0914_cover_4",
    splits=cover_splits,
    exp_keys=vlm_exp_keys_cover + stac_exp_keys,
    return_test_data=True,
    return_test_frame=True,
)
cover_metrics_aggr = aggregate_metrics(
    splits=["na", "ss"],
    exp_keys=vlm_exp_keys_cover + stac_exp_keys,
    data=cover_metrics
)

close_splits = ["na", "ss"]
close_metrics = compile_metrics(
    domain="0914_close_4",
    splits=close_splits,
    exp_keys=vlm_exp_keys_close + stac_exp_keys,
    return_test_data=True,
    return_test_frame=True,
)
close_metrics_aggr = aggregate_metrics(
    splits=["na", "ss"],
    exp_keys=vlm_exp_keys_close + stac_exp_keys,
    data=close_metrics
)

In [None]:
exp_key = stac_exp_keys[0]
for domain, domain_metrics in zip(["Cover", "Close"], [[cover_metrics], [close_metrics]]):
    print(f"\nDomain: {domain}")
    P_TOT = N_TOT = 0
    for split in ["na", "ss"]:
        P = N = 0
        for d in domain_metrics:
            labels: np.ndarray = d[split][exp_key]["data"]["test_labels"]
            N += np.sum(labels == True)
            P += np.sum(labels == False)
        
        print(f"Split: {split} | Success: {N} | Failures: {P} | Rate: {N / (P + N):.2f}")
        N_TOT += N
        P_TOT += P

    print(f"Split: Combined | Success: {N_TOT} | Failures: {P_TOT} | Rate: {N_TOT / (P_TOT + N_TOT):.2f}")


In [None]:
print("\nDomain: Cover")
for exp_key in vlm_exp_keys_cover + stac_exp_keys:
    metrics = cover_metrics_aggr[exp_key]["metrics"]
    print_metrics(f"{exp_key}", metrics, with_accuracy=True, time_mod=5.0)

print("\nDomain: Close")
for exp_key in vlm_exp_keys_close + stac_exp_keys:
    metrics = close_metrics_aggr[exp_key]["metrics"]
    print_metrics(f"{exp_key}", metrics, with_accuracy=True, time_mod=5.0)

In [None]:
# Compute sentinel results: Cover Object.
stac_metrics_cover, vlme_metrics_cover, sent_metrics_cover = compute_sentinel_result(
    stac_exp_key=stac_exp_keys[0],
    vlm_exp_keys_list=[vlm_exp_keys_cover],
    metrics_list=[cover_metrics],
    splits_list=[cover_splits],
    time_mod=5.0,
    domain_names=["Cover"],
)

In [None]:
# Compute sentinel results: Close Box.
stac_metrics_close, vlme_metrics_close, sent_metrics_close = compute_sentinel_result(
    stac_exp_key=stac_exp_keys[0],
    vlm_exp_keys_list=[vlm_exp_keys_close],
    metrics_list=[close_metrics],
    splits_list=[close_splits],
    time_mod=5.0,
    domain_names=["Close"],
)

In [None]:
# Compute sentinel results: Cover Object + Close Box.
stac_metrics_joint, vlme_metrics_joint, sent_metrics_joint = compute_sentinel_result(
    stac_exp_key=stac_exp_keys[0],
    vlm_exp_keys_list=[vlm_exp_keys_cover, vlm_exp_keys_close],
    metrics_list=[cover_metrics, close_metrics],
    splits_list=[cover_splits, close_splits],
    time_mod=5.0,
    domain_names=["Cover", "Close"],
)

In [8]:
stac_tpr = stac_metrics_joint["TPR"]
stac_fpr = stac_metrics_joint["FPR"]
stac_time = stac_metrics_joint["TP Time Mean"] / 5.0

vlme_tpr = vlme_metrics_joint["TPR"]
vlme_fpr = vlme_metrics_joint["FPR"]
vlme_time = vlme_metrics_joint["TP Time Mean"] / 5.0

sent_tpr = sent_metrics_joint["TPR"]
sent_fpr = sent_metrics_joint["FPR"]
sent_time = sent_metrics_joint["TP Time Mean"] / 5.0

## STAC and Sentinel Result

In [None]:
fig = plt.figure(figsize=(7, 3.5))

colors = ["#91c4a2", "#fdae61"]
labels = ["Temporal Consistency + VLM", "Temporal Consistency"]

# Top: TPR.
bar_width = 0.75
ax1 = plt.subplot(2, 1, 1)
data1 = np.array([sent_tpr, stac_tpr])
bars1 = ax1.barh(labels, data1, bar_width, color=colors)

ax1.set_title("True Positive Rate", fontsize=18)
ax1.set_yticks([])
ax1.set_yticklabels([])
ax1.set_xlim([0, 1])
ax1.set_axisbelow(True)
ax1.xaxis.grid(True, linestyle='-', linewidth=0.5, color="gray")

ax1.spines['top'].set_visible(False)
ax1.spines['right'].set_visible(False)
ax1.spines['left'].set_linewidth(2.5)
ax1.spines['bottom'].set_linewidth(2.5)
ax1.tick_params(axis='x', labelsize=13)

distance = np.abs(stac_tpr - sent_tpr)
ax1.annotate(
    "",
    xy=(sent_tpr - 0.02, 0.9), xytext=(stac_tpr + 0.02, 0.9),
    xycoords='data', textcoords="data",
    arrowprops={
        "arrowstyle": "<->", 
        "ec": "black",
        "linewidth": 2.0,
    }
)
ax1.annotate(
    f"+{int(distance * 100)}%",
    xy=((stac_tpr + sent_tpr) / 2 - 0.05, 1.1), xytext=(0, 0),
    xycoords="data", textcoords="offset points", fontsize=14
)

# Left: FPR.
bar_width = 0.7
ax2 = plt.subplot(2, 2, 3)
data2 = np.array([sent_fpr, stac_fpr])
bars2 = ax2.barh(labels, data2, bar_width, color=colors)

ax2.set_title("False Positive Rate", fontsize=18)
ax2.set_yticks([])
ax2.set_yticklabels([])
ax2.set_xlim([0, 0.2])
ax2.set_xticks([0, 0.05, 0.1, 0.15, 0.2])
ax2.set_axisbelow(True)
ax2.xaxis.grid(True, linestyle='-', linewidth=0.5, color="gray")
ax2.tick_params(axis='x', labelsize=13)

distance = np.abs(stac_fpr - sent_fpr)
ax2.annotate(
    "",
    xy=(sent_fpr, 0.9), xytext=(stac_fpr, 0.9),
    xycoords='data', textcoords="data",
    arrowprops={
        "arrowstyle": "<->", 
        "ec": "black",
        "linewidth": 2.0,
    }
)
ax2.annotate(
    f"+{int(distance * 100)}%",
    xy=((stac_fpr + sent_fpr) / 2 - 0.018, 1.1), xytext=(0, 0),
    xycoords="data", textcoords="offset points", fontsize=13
)

ax2.spines['top'].set_visible(False)
ax2.spines['right'].set_visible(False)
ax2.spines['left'].set_linewidth(2.5)
ax2.spines['bottom'].set_linewidth(2.5)

# Right: Detection Times.
bar_width = 0.7
ax3 = plt.subplot(2, 2, 4)
data3 = np.array([sent_time, stac_time])
bars3 = ax3.barh(labels, data3, bar_width, color=colors)

ax3.set_title("Detection Time (s)", fontsize=18)
ax3.set_yticks([])
ax3.set_yticklabels([])
ax3.set_axisbelow(True)
ax3.xaxis.grid(True, linestyle='-', linewidth=0.5, color="gray")
ax3.tick_params(axis='x', labelsize=13)

ax3.spines['top'].set_visible(False)
ax3.spines['right'].set_visible(False)
ax3.spines['left'].set_linewidth(2.5)
ax3.spines['bottom'].set_linewidth(2.5)

fig.legend(bars1[::-1], labels[::-1], loc='upper center', bbox_to_anchor=(0.5, 0.025), ncol=2, fancybox=True, fontsize=13)
plt.subplots_adjust(bottom=0.2, wspace=1.0)
plt.tight_layout()

save_path = CWD / ".." / f"figures_{dir_postfix}" / f"full-system-result.{render_format}"
plt.savefig(save_path, format=render_format, dpi=300, bbox_inches='tight', transparent=True)
plt.show()

## STAC, VLM, and Sentinel Result

In [None]:
fig = plt.figure(figsize=(7, 3.5))

colors = ["#91c4a2", "#c994c7", "#fdae61"]
labels = ["VLM (GPT-4o / Claude)", "Sentinel (STAC + VLM)", "STAC"]

# Top: TPR.
bar_width = 0.75
ax1 = plt.subplot(2, 1, 1)
data1 = np.array([vlme_tpr, sent_tpr, stac_tpr])
bars1 = ax1.barh(labels, data1, bar_width, color=colors)

ax1.set_title("True Positive Rate", fontsize=18)
ax1.set_yticks([])
ax1.set_yticklabels([])
ax1.set_xlim([0, 1])
ax1.set_axisbelow(True)
ax1.xaxis.grid(True, linestyle='-', linewidth=0.5, color="gray")

ax1.spines['top'].set_visible(False)
ax1.spines['right'].set_visible(False)
ax1.spines['left'].set_linewidth(2.5)
ax1.spines['bottom'].set_linewidth(2.5)
ax1.tick_params(axis='x', labelsize=13)

distance = np.abs(stac_tpr - sent_tpr)
ax1.annotate(
    "",
    xy=(sent_tpr - 0.02, 1.8), xytext=(stac_tpr + 0.02, 1.8),
    xycoords='data', textcoords="data",
    arrowprops={
        "arrowstyle": "<->", 
        "ec": "black",
        "linewidth": 2.0,
    }
)
ax1.annotate(
    f"+{int(distance * 100)}%",
    xy=((stac_tpr + sent_tpr) / 2 - 0.05, 2.0), xytext=(0, 0),
    xycoords="data", textcoords="offset points", fontsize=14
)

# Left: FPR.
bar_width = 0.7
ax2 = plt.subplot(2, 2, 3)
data2 = np.array([vlme_fpr, sent_fpr, stac_fpr])
bars2 = ax2.barh(labels, data2, bar_width, color=colors)

ax2.set_title("False Positive Rate", fontsize=18)
ax2.set_yticks([])
ax2.set_yticklabels([])
ax2.set_xlim([0, 0.2])
ax2.set_xticks([0, 0.05, 0.1, 0.15, 0.2])
ax2.set_axisbelow(True)
ax2.xaxis.grid(True, linestyle='-', linewidth=0.5, color="gray")
ax2.tick_params(axis='x', labelsize=13)

distance = np.abs(stac_fpr - sent_fpr)
ax2.annotate(
    "",
    xy=(sent_fpr, 1.8), xytext=(stac_fpr, 1.8),
    xycoords='data', textcoords="data",
    arrowprops={
        "arrowstyle": "<->", 
        "ec": "black",
        "linewidth": 2.0,
    }
)
ax2.annotate(
    f"+{int(distance * 100)}%",
    xy=((stac_fpr + sent_fpr) / 2 - 0.018, 2.0), xytext=(0, 0),
    xycoords="data", textcoords="offset points", fontsize=13
)

ax2.spines['top'].set_visible(False)
ax2.spines['right'].set_visible(False)
ax2.spines['left'].set_linewidth(2.5)
ax2.spines['bottom'].set_linewidth(2.5)

# Right: Detection Times.
bar_width = 0.7
ax3 = plt.subplot(2, 2, 4)
data3 = np.array([vlme_time, sent_time, stac_time])
bars3 = ax3.barh(labels, data3, bar_width, color=colors)

ax3.set_title("Detection Time (s)", fontsize=18)
ax3.set_yticks([])
ax3.set_yticklabels([])
ax3.set_axisbelow(True)
ax3.xaxis.grid(True, linestyle='-', linewidth=0.5, color="gray")
ax3.tick_params(axis='x', labelsize=13)

ax3.spines['top'].set_visible(False)
ax3.spines['right'].set_visible(False)
ax3.spines['left'].set_linewidth(2.5)
ax3.spines['bottom'].set_linewidth(2.5)


fig.legend(bars1[::-1], labels[::-1], loc='upper center', bbox_to_anchor=(0.5, 0.025), ncol=3, fancybox=True, fontsize=12)
plt.subplots_adjust(bottom=0.2, wspace=1.0)
plt.tight_layout()

save_path = CWD / ".." / f"figures_{dir_postfix}" / f"full-system-result.{render_format}"
plt.savefig(save_path, format=render_format, dpi=300, bbox_inches='tight', transparent=True)
plt.show()

## Full Table Result

In [12]:
# Generate sentinel experiment keys.
vlm_exp_keys_cover = get_vlm_exp_keys(
    models=[
        "gpt-4o", 
        "claude-3-5-sonnet-20240620", 
        "gemini-1-5-pro"
    ],
    templates={
        "gpt-4o": ["image_qa", "video_qa"],
        "claude-3-5-sonnet-20240620": ["image_qa", "video_qa", "video_qa_ref_video", "video_qa_ref_goal"],
        "gemini-1-5-pro": ["image_qa", "video_qa"],
    }
)
vlm_exp_keys_close = get_vlm_exp_keys(
    models=[
        "gpt-4o", 
        "claude-3-5-sonnet-20240620", 
        "gemini-1-5-pro"
    ],
    templates={
        "gpt-4o": ["image_qa", "video_qa"],
        "claude-3-5-sonnet-20240620": ["image_qa", "video_qa"],
        "gemini-1-5-pro": ["image_qa", "video_qa"],
    }
)
stac_exp_keys = get_temporal_consistency_exp_keys(
    pred_horizons=[16],
    sample_sizes=[32],
    error_fns=["mmd_rbf_all", "kde_kl_all_rev", "kde_kl_all_for", "mse_all"],
    aggr_fns=["min"],
)

# Generate ensemble experiment keys.
ens_exp_keys = get_ensemble_exp_keys(
    pred_horizons=[16],
    sample_sizes=[32],
    action_spaces=["all"],
)

# Load results.
cover_splits = ["na", "ss"]
cover_metrics = compile_metrics(
    domain="0914_cover_4",
    splits=cover_splits,
    exp_keys=vlm_exp_keys_cover + stac_exp_keys + ens_exp_keys,
    return_test_data=True,
    return_test_frame=True,
)
cover_metrics_aggr = aggregate_metrics(
    splits=["na", "ss"],
    exp_keys=vlm_exp_keys_cover + stac_exp_keys + ens_exp_keys,
    data=cover_metrics
)

close_splits = ["na", "ss"]
close_metrics = compile_metrics(
    domain="0914_close_4",
    splits=close_splits,
    exp_keys=vlm_exp_keys_close + stac_exp_keys + ens_exp_keys,
    return_test_data=True,
    return_test_frame=True,
)
close_metrics_aggr = aggregate_metrics(
    splits=["na", "ss"],
    exp_keys=vlm_exp_keys_close + stac_exp_keys + ens_exp_keys,
    data=close_metrics
)

In [None]:
for exp_key in stac_exp_keys + vlm_exp_keys_cover + ens_exp_keys:
    row = f"{exp_key} & "
    
    # Dataset split result.
    for split in cover_splits:
        for metric in ["TPR", "TNR", "TP Time Mean"]:
            stat = cover_metrics[split][exp_key]["metrics"].get(metric, -1)
            if stat < 0:
                row += "N/A & "
                continue
            elif metric == "TP Time Mean":
                stat = stat / 5
            row += f"{round(stat, 2):0.2f} & "
        row += "& "
    row += "& "

    # Aggregate result.
    for metric in ["TPR", "TNR", "Accuracy"]:
        stat = cover_metrics_aggr[exp_key]["metrics"][metric]
        row += f"{stat:0.2f} & "

    print(row[:-2] + "\\\\")

In [None]:
for exp_key in stac_exp_keys + vlm_exp_keys_close + ens_exp_keys:
    row = f"{exp_key} & "
    
    # Dataset split result.
    for split in close_splits:
        for metric in ["TPR", "TNR", "TP Time Mean"]:
            stat = close_metrics[split][exp_key]["metrics"].get(metric, -1)
            if stat < 0:
                row += "N/A & "
                continue
            elif metric == "TP Time Mean":
                stat = stat / 5
            row += f"{round(stat, 2):0.2f} & "
        row += "& "
    row += "& "

    # Aggregate result.
    for metric in ["TPR", "TNR", "Accuracy"]:
        stat = close_metrics_aggr[exp_key]["metrics"][metric]
        row += f"{stat:0.2f} & "

    print(row[:-2] + "\\\\")