In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

df = pd.read_json("../data/CIFAR_editing_results.json")
df["loss_matrix_against_source"] = df["loss_matrix_against_source"].apply(np.array)
df["loss_matrix_against_target"] = df["loss_matrix_against_target"].apply(np.array)
df["top1_matrix_against_source"] = df["top1_matrix_against_source"].apply(np.array)
df["top1_matrix_against_target"] = df["top1_matrix_against_target"].apply(np.array)
# average over `seed`
mean_df = df.groupby(["model", "editing_mode"]).mean().reset_index()
# standard error over `seed`
scalar_df = df.drop(columns=["loss_matrix_against_source", "loss_matrix_against_target", "top1_matrix_against_source", "top1_matrix_against_target"])
stderr_df = scalar_df.groupby(["model", "editing_mode"]).sem().reset_index()

summary_df = pd.merge(mean_df, stderr_df, on=["model", "editing_mode"], suffixes=("_mean", "_stderr"))
row = summary_df.iloc[2]

# 10 x 10 matrix of losses source -> target
loss_mat = row["loss_matrix_against_target"]
# 10 x 10 matrix of accuracies source -> target
acc_mat = row["top1_matrix_against_target"]

In [None]:
summary_df.drop(columns=["loss_matrix_against_source", "loss_matrix_against_target", "top1_matrix_against_source", "top1_matrix_against_target"])

In [None]:

# Plot the loss matrix
plt.imshow(loss_mat)
plt.colorbar()
plt.xlabel("Target")
plt.ylabel("Source")
plt.title(f"Loss for {row['model']} with {row['editing_mode']} editing")

plt.show()

# Plot the accuracy matrix
plt.imshow(acc_mat)
plt.colorbar()
plt.xlabel("Target")
plt.ylabel("Source")
plt.title(f"Accuracy for {row['model']} with {row['editing_mode']} editing")
plt.show()


# Visual comparison to non-least squares quadratic concept editing

In [None]:
from torchvision.datasets import CIFAR10
from concept_erasure import QuadraticFitter
from concept_editing import get_editor, get_train_test_data
import torch
from torchvision.transforms.functional import to_tensor

download_dir = "/mnt/ssd-1/alexm/cifar10"
data = CIFAR10(root=download_dir, download=True)
images, labels = zip(*data)

In [None]:
X = torch.stack(list(map(to_tensor, images))) # n x c x w x h

In [None]:
X_train, X_test, Y_train, Y_test = get_train_test_data(
            total_size=None, test_size=1024, flatten=True
        )
X_train = X_train.double().cpu()
Y_train = Y_train.cpu()
fitter = QuadraticFitter.fit(X_train, Y_train)
optimal_editor = fitter.editor()

In [None]:
X_bar = X_train.mean(dim=0)
X_ctr = X_train - X_bar
cov_xx = X_ctr.T @ X_ctr / (X_ctr.shape[0] - 1)

In [None]:
from concept_erasure.optimal_transport import psd_sqrt_rsqrt, psd_sqrt
def quadratic_edit(im: torch.Tensor, source: int, target: int, optimal=False):
    orig_shape = im.shape
    im = im.cpu().double().flatten()
    if optimal:
        return optimal_editor(im.unsqueeze(0), torch.tensor([source]), target).reshape(orig_shape)
    else:
        P = fitter.sigma_xx[source]
        Q = fitter.sigma_xx[target]
        _, inv_sqrt_P = psd_sqrt_rsqrt(P)
        sqrt_Q = psd_sqrt(Q)
        im_ctr = im - fitter.mean_x[source]
        return (sqrt_Q @ inv_sqrt_P @ im_ctr + fitter.mean_x[target]).reshape(orig_shape)
    
def quadratic_erase(im: torch.Tensor, source: int, optimal=False):
    orig_shape = im.shape
    im = im.cpu().double().flatten()
    if optimal:
        return fitter.eraser(im.unsqueeze(0), torch.tensor([source])).reshape(orig_shape)
    else:
        P = fitter.sigma_xx[source]
        Q = cov_xx
        _, inv_sqrt_P = psd_sqrt_rsqrt(P)
        sqrt_Q = psd_sqrt(Q)
        im_ctr = im - fitter.mean_x[source]
        return (sqrt_Q @ inv_sqrt_P @ im_ctr + X_bar).reshape(orig_shape)

In [None]:
import matplotlib.pyplot as plt
idx = 2
im = X[idx]
source = labels[idx]
plt.imshow(im.numpy().transpose(1, 2, 0))
plt.title(f"Original")
plt.show()

In [None]:
target = 2
im_edit_suboptimal = quadratic_edit(torch.tensor(im), source, target, optimal=False)
im_edit_optimal = quadratic_edit(torch.tensor(im), source, target, optimal=True)

In [None]:
plt.imshow(im_edit_suboptimal.numpy().transpose(1, 2, 0))
plt.title("Naive quadratic edited")
plt.show()

plt.title("Q-LEACE edited")
plt.imshow(im_edit_optimal.numpy().transpose(1, 2, 0))
plt.show()

In [None]:
diff = im_edit_suboptimal - im_edit_optimal
plt.imshow(diff.numpy().transpose(1, 2, 0))
plt.show()

In [None]:
diff.abs().mean() / im_edit_optimal.abs().mean()

In [None]:
im_erased_suboptimal = quadratic_erase(torch.tensor(im), source, optimal=False)
im_erased_optimal = quadratic_erase(torch.tensor(im), source, optimal=True)

In [None]:
plt.imshow(im_erased_suboptimal.numpy().transpose(1, 2, 0))
plt.title("Naive quadratic erased")
plt.show()

plt.imshow(im_erased_optimal.numpy().transpose(1, 2, 0))
plt.title("Q-LEACE erased")
plt.show()

In [None]:
plt.imshow(im.numpy().transpose(1, 2, 0))

In [None]:
diff_optimal = im_erased_optimal - im
diff_suboptimal = im_erased_suboptimal - im
print(diff_optimal.norm().mean() / im.norm().mean())
print(diff_suboptimal.norm().mean() / im.norm().mean())

In [None]:
total_err_optimal = 0
total_err_suboptimal = 0
for idx in range(100):
    im = X[idx]
    source = labels[idx]

    im_erased_suboptimal = quadratic_erase(torch.tensor(im), source, optimal=False)
    im_erased_optimal = quadratic_erase(torch.tensor(im), source, optimal=True)

    diff_optimal = im_erased_optimal - im
    diff_suboptimal = im_erased_suboptimal - im
    err_subopt = diff_suboptimal.abs().mean() / im.abs().mean()
    err_opt = diff_optimal.abs().mean() / im.abs().mean()
    total_err_optimal += err_opt
    total_err_suboptimal += err_subopt

    print(f"Image {idx}:")
    print(f"Average error for optimal: {total_err_optimal / (idx + 1)}")
    print(f"Average error for suboptimal: {total_err_suboptimal / (idx + 1)}")

# Test visionprobe

In [1]:
from mdl import VisionProbe
from concept_editing import get_train_test_data, evaluate_model
import torch
device = "cuda"
NUM_CLASSES = 10
X_train, X_test, Y_train, Y_test = get_train_test_data(
            train_size=None, test_size=1024, flatten=False, device=device
        )

model = VisionProbe(
            num_classes=NUM_CLASSES,
            device=X_train.device,
            dtype=torch.float32,
        )

  from .autonotebook import tqdm as notebook_tqdm


Files already downloaded and verified
Files already downloaded and verified
Train+val size: 50000
Test size: 1024


In [2]:
model.train()
model.fit(X_train, Y_train, max_epochs=100, early_stop_epochs=4, reduce_lr_on_plateau=False, verbose=True)

Epoch:   7%|▋         | 7/100 [02:03<27:25, 17.69s/it, loss=2.6] 


In [3]:
from concept_editing import get_editor
editor = get_editor("linear", X_train, Y_train)


In [4]:
evaluate_model(model, X_test, Y_test, editor=editor)

OutOfMemoryError: CUDA out of memory. Tried to allocate 20.00 MiB (GPU 0; 10.75 GiB total capacity; 10.46 GiB already allocated; 18.44 MiB free; 10.48 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF