In [None]:
%config InlineBackend.figure_format = 'retina'

In [None]:
from deep_analytics.assays import AdversarialAttacks, AttackTypes

In [None]:
AttackTypes

In [None]:
AdversarialAttacks.datasets

In [None]:
assay_adv_attack = AdversarialAttacks('imagenette2_s320_remap1k', split='val')
assay_adv_attack.dataset

In [None]:
img,label,index = assay_adv_attack.dataset[0]
print(label,index,img.size)
img

In [None]:
from torchvision import models, transforms
from functools import partial
from model_rearing_workshop.models import load_model_from_weights
from model_rearing_workshop.models.weights import get_standard_transforms, Weights

In [None]:
weights = Weights(
    url='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_layer_diffnoise_scaled_sqrt1/supervised/20231122_102040/final_weights-42b687fb09.pth',
    transforms=get_standard_transforms(), # Add your transforms here
    meta={
        "repo": "https://github.com/harvard-visionlab/alexnets",
        "urls": dict(
            params='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_layer_diffnoise_scaled_sqrt1/supervised/20231122_102040/params-42b687fb09.json',
            train='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_layer_diffnoise_scaled_sqrt1/supervised/20231122_102040/log_train-42b687fb09.txt',
            val='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_layer_diffnoise_scaled_sqrt1/supervised/20231122_102040/log_val-42b687fb09.txt',
        ),
        "_metrics": {},
        "_docs": """
            ....
        """,
    },
)
transform = alexnet2023_layer_diffnoise_scaled_sqrt1.transforms['val_transform']
transform

In [None]:
import torch 
import torch.nn as nn
from functools import partial
import contextlib
import io
import sys

@contextlib.contextmanager
def suppress_stdout():
    new_stdout = io.StringIO()
    old_stdout = sys.stdout
    sys.stdout = new_stdout
    try:
        yield
    finally:
        sys.stdout = old_stdout
        
class ModelWrapper(nn.Module):
    def __init__(self, model):
        super().__init__() 
        self.model = model
    def forward(self, x):
        embeddings, layer_outputs, layer_logits = self.model(x)
        return embeddings
    
def load_model_eval(weights):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    with suppress_stdout():
        model = load_model_from_weights(weights)
    
    wrapped_model = ModelWrapper(model)
    wrapped_model.model_name = 'alexnet2023_layer_diffnoise_scaled_sqrt1'
    wrapped_model.to(device)
    wrapped_model.eval()
    
    return wrapped_model

def load_model_train(weights):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    with suppress_stdout():
        model = load_model_from_weights(weights)
    
    wrapped_model = ModelWrapper(model)
    wrapped_model.model_name = 'alexnet2023_layer_diffnoise_scaled_sqrt1'
    wrapped_model.to(device)
    wrapped_model.train()
    
    return wrapped_model

In [None]:
# model = load_model_eval()
# model

In [None]:
results_eval = assay_adv_attack.run(load_model_eval, transform, attack=AttackTypes.FGSM)
results_eval.head()

In [None]:
assay_adv_attack.plot_results(results_eval)

In [None]:
results_train = assay_adv_attack.run(load_model_train, transform, attack=AttackTypes.FGSM)
results_train.head()

In [None]:
assay_adv_attack.plot_results(results_train)

In [None]:
subset1 = results_eval[results_eval.image_set=='adversarial']
subset1['mode'] = 'eval'
subset2 = results_train[results_train.image_set=='adversarial']
subset2['mode'] = 'train'

df = pd.concat([subset1, subset2])
assay_adv_attack.plot_results(df, hue="mode")

In [None]:
weights = Weights(
    url='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_inp_diffnoise_scaled_sqrt0/supervised/20231122_102051/final_weights-1d52da36f3.pth',
    transforms=get_standard_transforms(), # Add your transforms here
    meta={
        "repo": "https://github.com/harvard-visionlab/alexnets",
        "urls": dict(
            params='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_inp_diffnoise_scaled_sqrt0/supervised/20231122_102051/params-1d52da36f3.json',
            train='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_inp_diffnoise_scaled_sqrt0/supervised/20231122_102051/log_train-1d52da36f3.txt',
            val='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_inp_diffnoise_scaled_sqrt0/supervised/20231122_102051/log_val-1d52da36f3.txt',
        ),
        "_metrics": {},
        "_docs": """
            ....
        """,
    },
)
load_model = partial(load_model_eval, weights)
transform = weights.transforms['val_transform']
transform

In [None]:
results_2 = assay_adv_attack.run(load_model, transform, attack=AttackTypes.FGSM)
results_2.head()

In [None]:
assay_adv_attack.plot_results(results_2)

In [None]:
# weights = Weights(
#     url='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_baseline/barlow/20231116_044210/final_weights-0b70b9da61.pth',
#     transforms=get_standard_transforms(), # Add your transforms here
#     meta={
#         "repo": "https://github.com/harvard-visionlab/alexnets",
#         "urls": dict(
#             params='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_baseline/barlow/20231116_044210/params-0b70b9da61.json',
#             train='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_baseline/barlow/20231116_044210/log_train-0b70b9da61.txt',
#             val='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_baseline/barlow/20231116_044210/log_val-0b70b9da61.txt',
#         ),
#         "_metrics": {},
#         "_docs": """
#             ....
#         """,
#     },
# )

weights = Weights(
    url='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_baseline/supervised/20231115_062107/final_weights-b0b1f89b7a.pth',
    transforms=get_standard_transforms(), # Add your transforms here
    meta={
        "repo": "https://github.com/harvard-visionlab/alexnets",
        "urls": dict(
            params='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_baseline/supervised/20231115_062107/params-b0b1f89b7a.json',
            train='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_baseline/supervised/20231115_062107/log_train-b0b1f89b7a.txt',
            val='https://visionlab-members.s3.wasabisys.com/alvarez/Projects/model_rearing_workshop/models/in1k/alexnet2023_baseline/supervised/20231115_062107/log_val-b0b1f89b7a.txt',
        ),
        "_metrics": {},
        "_docs": """
            ....
        """,
    },
)

load_model = partial(load_model_eval, weights)
transform = weights.transforms['val_transform']
transform

In [None]:
results_baseline = assay_adv_attack.run(load_model, transform, attack=AttackTypes.FGSM)
results_baseline.head()

In [None]:
assay_adv_attack.plot_results(results_baseline)

# Quantify Robustness

1. convert pandas dataframe to numpy array
2. compute mean and upper/lower CI for each image_set + epsilon combo
3. Compute robustness scores
- [ ] normalized AUC sum(acc_by_eps / acc_eps_0)
- [ ] normalized acc mean(acc_by_eps/acc_eps_0)
- [ ] compute equal-weighted robust accuracy: 
    weights = [1.0] * len(epsilons)  # Equal weights
    robust_accuracy = clean_accuracy - np.average(adversarial_accuracies, weights=weights)
- [ ] compute weighted robust accuracy
    weights = 1/epsilons[1:] # Inverse proportional weights
    robust_accuracy = clean_accuracy - np.average(adversarial_accuracies, weights=weights)



In [None]:
from deep_analytics.assays.adversarial_attacks.adversarial_attacks import dataframe_to_array

In [None]:
arr = dataframe_to_array(results_baseline[results_baseline.image_set=='adversarial'], 'correct1')

In [None]:
import numpy as np

print(arr.keys())
print(arr['dims'])
D = arr['D']
epsilons = np.array(arr['epsilons'])
D.shape, epsilons

In [None]:
import numpy as np
from functools import partial
from fastprogress import progress_bar 

class AccumMetric:
    def __init__(self, scoring_func, ci_level=0.95):
        self.scores = []
        self.scoring_func = scoring_func
        self.ci_level = ci_level    

    def reset(self):
        self.scores = []

    def stats(self, ci_level=None, axis=None):
        if not self.scores:
            return None
        ci_level = self.ci_level if ci_level is None else ci_level
        axis = 0 if axis is None else axis
        
        # Calculate the mean
        mean_score = np.mean(self.scores, axis=axis)

        # Calculate the lower and upper percentiles for the confidence interval
        lower_percentile = 100 * (1 - self.ci_level) / 2
        upper_percentile = 100 * (1 + self.ci_level) / 2

        lower_ci = np.percentile(self.scores, lower_percentile, axis=axis)
        upper_ci = np.percentile(self.scores, upper_percentile, axis=axis)

        return {
            "mean": mean_score,
            f"{int(self.ci_level * 100)}% CI": (lower_ci, upper_ci),
        }
    
    def __call__(self, data):
        # Calculate the score using the provided scoring function
        score = self.scoring_func(data)
        self.scores.append(score)
        
def estimate_thresh_crossing(xs, ys, threshold):
    crossings = []

    for i in range(len(xs) - 1):
        x1, x2 = xs[i], xs[i + 1]
        y1, y2 = ys[i], ys[i + 1]

        if (y1 < threshold and y2 >= threshold) or (y1 >= threshold and y2 < threshold):
            # Linear interpolation to estimate the crossing point
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - slope * x1
            crossing_x = (threshold - intercept) / slope
            crossings.append(crossing_x)

    if crossings:
        # Calculate the average crossing point if multiple crossings occurred
        estimated_epsilon = np.mean(crossings)
    else:
        estimated_epsilon = None

    return estimated_epsilon, crossings

def compute_normalized_acc_by_eps(data, epsilons, dim=-1):
    acc_by_eps = data.mean(axis=dim)
    return acc_by_eps

def compute_normalized_auc(data, epsilons, dim=-1):
    assert epsilons[0]==0, f"oops, expected first epsilon to be zero, got {epsilons[0]}"
    acc_by_eps = data.mean(axis=dim)
    clean_accuracy = acc_by_eps[0]
    normed_acc = acc_by_eps / clean_accuracy
    normalized_auc = np.sum(normed_acc)
    return normalized_auc

def compute_normalized_acc(data, epsilons, dim=-1):
    assert epsilons[0]==0, f"oops, expected first epsilon to be zero, got {epsilons[0]}"
    acc_by_eps = data.mean(axis=dim)
    clean_accuracy = acc_by_eps[0]
    normed_acc = acc_by_eps / clean_accuracy
    normalized_acc = np.mean(normed_acc)
    return normalized_acc

def compute_weighted_adv_acc(data, epsilons, dim=-1, normalize=True):
    assert epsilons[0]==0, f"oops, expected first epsilon to be zero, got {epsilons[0]}"
    acc_by_eps = data.mean(axis=-1)
    clean_accuracy = acc_by_eps[0]
    adv_accuracies = acc_by_eps[1:]
    if normalize:
        adv_accuracies = adv_accuracies / clean_accuracy
    weighted_adv_acc = np.average(adv_accuracies, weights=1/epsilons[1:])
    return weighted_adv_acc

def compute_thresh_half_minmax(data, epsilons, dim=-1, normalize=True):
    assert epsilons[0]==0, f"oops, expected first epsilon to be zero, got {epsilons[0]}"
    acc_by_eps = data.mean(axis=dim)
    clean_accuracy = acc_by_eps[0]
    normed_acc = acc_by_eps / clean_accuracy
    half_max = normed_acc.min() + (normed_acc.max() - normed_acc.min())/2
    thresh_half_minmax, _ = estimate_thresh_crossing(epsilons, normed_acc, half_max)
    
    return thresh_half_minmax

In [None]:
from deep_analytics.utils.bootstrap import bootstrap_multi_dim

In [None]:

samples = bootstrap_multi_dim(D[0,0], dims=(1), n_bootstrap=1000, seed=1234)

metrics = [
    AccumMetric(partial(compute_normalized_acc_by_eps, epsilons=epsilons)),
    AccumMetric(partial(compute_normalized_auc, epsilons=epsilons)),
    AccumMetric(partial(compute_normalized_acc, epsilons=epsilons)),
    AccumMetric(partial(compute_weighted_adv_acc, epsilons=epsilons, normalize=True)),
    AccumMetric(partial(compute_thresh_half_minmax, epsilons=epsilons, normalize=True))
]

for sample in progress_bar(samples):
    for metric in metrics: 
        metric(sample)

In [None]:
metrics = dict([
            ('acc_by_eps', AccumMetric(partial(compute_normalized_acc_by_eps, epsilons=epsilons))),
            ('norm_auc', AccumMetric(partial(compute_normalized_auc, epsilons=epsilons))),
            ('norm_acc', AccumMetric(partial(compute_normalized_acc, epsilons=epsilons))),
            ('weighted_norm_acc', AccumMetric(partial(compute_weighted_adv_acc, epsilons=epsilons, normalize=True))),
            ('thresh_half_minmax', AccumMetric(partial(compute_thresh_half_minmax, epsilons=epsilons, normalize=True)))
        ])
metrics.values()

In [None]:
np.array(metrics[0].scores).shape

In [None]:
metrics[0].stats()

In [None]:
metrics[1].stats()

In [None]:
metrics[2].stats()

In [None]:
metrics[3].stats()

In [None]:
metrics[4].stats()

In [None]:
assert epsilons[0]==0
acc_by_eps = D[0,0].mean(axis=1)
clean_accuracy = acc_by_eps[0]
adv_accuracies = acc_by_eps[1:]
normed_acc = acc_by_eps / clean_accuracy
normalized_auc = np.sum(normed_acc)
normalized_acc = np.mean(normed_acc)
weighted_adv_acc = np.average(adv_accuracies, weights=1/epsilons[1:])
half_max = normed_acc.min() + (normed_acc.max() - normed_acc.min())/2
thresh_half_minmax, _ = estimate_thresh_crossing(epsilons, normed_acc, half_max)

normalized_auc, normalized_acc, weighted_adv_acc, thresh_half_minmax

In [None]:
D.shape

In [None]:
acc_by_eps, epsilons

In [None]:
estimate_epsilon_crossing(epsilons, acc_by_eps, 1/100)

In [None]:
# import torch
# import numpy as np
# from numpy.random import RandomState
# from fastprogress import progress_bar 

# def dataframe_to_array(df, data_column):
#     '''
#         convert dataframe into a numModels x numImageSets x numEpsilons x numItems array
#     '''
#     model_names = list(df.model_name.unique())
#     image_sets = list(df.image_set.unique())
#     epsilons = sorted(list(df.epsilon.unique()))
#     item_names = list(df.filenames.unique())
    
#     D = np.empty((len(model_names),len(image_sets),len(epsilons),len(item_names)))
#     D[:] = np.nan
#     for rownum,row in progress_bar(df.iterrows(), total=len(df)):
#         model_num = model_names.index(row.model_name)
#         imageset_num = image_sets.index(row.image_set)
#         epsilon_num = epsilons.index(row.epsilon)
#         item_num = item_names.index(row.filenames)
#         curr_val = D[model_num, imageset_num, epsilon_num, item_num]
#         assert np.isnan(curr_val), f"Oops, expected current value to be nan, got {curr_val}"    
#         D[model_num, imageset_num, epsilon_num, item_num] = row[data_column]
#     assert np.isnan(D).any() == False, "Oops, expected all values to be filled, found nans"    
    
#     return dict(
#         D=D,
#         dims=['model_name', 'image_set', 'epsilon', 'filename'],
#         model_names=model_names,
#         image_sets=image_sets,
#         epsilons=epsilons,
#         item_names=item_names
#     )

# def bootstrap_single_dim(D, dim, n_bootstrap=1000, seed=123):
#     num_items = D.shape[dim]
    
#     # Initialize the random number generator
#     rng = RandomState(seed)
    
#     # Generate bootstrap samples
#     samples = rng.choice(num_items, size=(n_bootstrap, num_items), replace=True)

#     # Select the samples along the specified dimension
#     bootstrap_samples = np.take(D, samples, axis=dim)
    
#     # Move the bootstrap dimension to the first position
#     bootstrap_samples = np.moveaxis(bootstrap_samples, dim, 0)
    
#     return bootstrap_samples

# def bootstrap_multi_dim_slow_loop_test(D, dims, n_bootstrap=10000, seed=123):
#     rng = RandomState(seed)

#     # Initialize the bootstrap sample array
#     new_shape = list(D.shape) + [n_bootstrap]
#     bootstrap_samples = np.empty(new_shape, dtype=D.dtype)

#     for i in range(n_bootstrap):
#         # Initialize indices for this bootstrap sample
#         indices = [slice(None)] * D.ndim

#         # Replace indices for the bootstrapped dimensions
#         for dim in dims:
#             num_items = D.shape[dim]
#             indices[dim] = rng.choice(num_items, size=num_items, replace=True)

#         # Using advanced indexing to select elements for the bootstrap sample
#         bootstrap_samples[..., i] = D[np.ix_(*indices)]

#     return bootstrap_samples

In [None]:
# list(df.cond_name.unique())

# figure out multi-dimensionsal bootstrap

In [None]:
# import torch
# import numpy as np
# from numpy.random import RandomState

# def generate_test_data(num_subjects=3, num_conds=2, num_items=5):
#     D = np.zeros((num_subjects, num_conds, num_items))

#     # Populate the array with unique values for each subject and each item
#     for subject in range(num_subjects):
#         for cond in range(num_conds):
#             # Offset each condition by 100
#             D[subject, cond, :] = np.arange(subject * 100, subject * 100 + num_items) + (cond * 100)
    
#     return D

In [None]:
# from functools import reduce
# from fastprogress import progress_bar

# def bootstrap_multi_dim(D, dims, n_bootstrap=10000, seed=None, vectorized=False):
#     """
#     Perform bootstrap resampling across specified dimensions of a multi-dimensional array.

#     Args:
#     D (numpy.ndarray): The input array from which to sample.
#     dims (int or tuple of ints): The dimensions over which to perform bootstrapping.
#     n_bootstrap (int): The number of bootstrap samples to generate.
#     seed (int): Random seed for reproducibility.
#     vectorized (bool): Whether to use vectorized indexing (flatten data, use flat_indices)
    
#     Returns:
#     numpy.ndarray: An array of bootstrapped samples. The shape of the array is 
#                    (n_bootstrap, *D.shape), where the first dimension corresponds
#                    to the bootstrap samples, and the remaining dimensions correspond
#                    to the dimensions of the original array.

#     The function generates random indices for the specified dimensions (dims) and 
#     uses the original indices for the other dimensions. These indices are expanded 
#     and scaled by the array's strides to calculate the flat indices, which are then 
#     used to index into a flattened version of the original array. The resulting 
#     samples are reshaped to form an array that retains the structure of the original 
#     array while incorporating the bootstrap dimension.
#     """    
#     if isinstance(dims, int): dims = (dims,)

#     rng = RandomState(seed)

#     # Generate random indices for each specified dimension, repeat original indices for other dimensions
#     indices = [rng.choice(D.shape[dim], size=(n_bootstrap, D.shape[dim]), replace=True) 
#                if dim in dims else np.arange(D.shape[dim])[None,:].repeat(n_bootstrap, 0)
#                for dim in range(D.ndim)]

#     if vectorized:
#         bootstrap_samples = vectorized_indexing(D, indices)
#     else:
#         bootstrap_samples = loop_indexing(D, indices)
    
#     return bootstrap_samples

# def loop_indexing(D, indices):
    
#     n_bootstrap = indices[0].shape[0]
    
#     # Initialize the bootstrap sample array
#     new_shape = [n_bootstrap] + list(D.shape)
#     bootstrap_samples = np.empty(new_shape, dtype=D.dtype)

#     # Iterate over the indices for each bootstrap sample
#     for i, curr_indices in enumerate(progress_bar(zip(*indices), total=n_bootstrap)):
#         # Use advanced indexing to select the sample
#         bootstrap_samples[i] = D[np.ix_(*curr_indices)]
    
#     # Now, bootstrap_samples contains the bootstrapped samples    
#     return bootstrap_samples

# def vectorized_indexing(D, indices):
#     n_bootstrap = indices[0].shape[0]
    
#     # Calculate the strides for each dimension in D
#     strides = np.array(D.strides) // D.itemsize

#     # Flatten the array D
#     D_flat = D.reshape(-1)
    
#     # lambda function to expand dimensions on indices so they broadcast correctly when summed
#     # e.g., if D is 5x2x100, n_bootstrap=1000, then reshape indices dim=0 to be 1000x5x1x1, etc.
#     adjusted_axes = lambda dim: [i+1 for i in range(D.ndim) if i != dim]
    
#     # Expand Dims and Multiply each set of indices with the corresponding stride before summing
#     scaled_indices = [np.ascontiguousarray(np.expand_dims(idx, axis=adjusted_axes(dim)) * stride).astype(np.int32)
#                       for dim, (idx, stride) in enumerate(zip(indices, strides))]

#     # Perform the direct summation using reduce (np.add performs the correct broadcasting)
#     flat_indices = reduce(np.add, scaled_indices)
    
#     # Use the flat indices to access the data values
#     bootstrap_samples_flat = D_flat[flat_indices]
    
#     # reshape 
#     bootstrap_samples = bootstrap_samples_flat.reshape(n_bootstrap, *D.shape)
    
#     return bootstrap_samples



In [None]:
# D = generate_test_data()
# print("Original Data:")
# print(D)
# print(D.shape)

In [None]:
# D = torch.rand((10,2,1260)).numpy()
# D.shape

In [None]:
# import time

# start = time.time()
# bs_samples1 = bootstrap_multi_dim(D, dims=(0,2), seed=123, vectorized=False)
# dur = time.time() - start
# bs_samples1.shape, dur

In [None]:
# import time

# start = time.time()
# bs_samples2 = bootstrap_multi_dim(D, dims=(0,2), seed=123, vectorized=True)
# dur = time.time() - start
# bs_samples2.shape, dur

In [None]:
# np.equal(bs_samples1, bs_samples2).all()