In [1]:
import torch
import numpy as np
import pandas as pd
import yaml
import matplotlib.pyplot as plt
from pathlib import Path
from tqdm.notebook import tqdm

import sys
from pathlib import Path

project_root = Path.cwd().parent 
sys.path.append(str(project_root))
sys.path.append("../")


from src.utils import get_dataset, select_model
from hydra import initialize, compose
from src.shap_explainer import ShapExplainer


2025-01-22 13:14:53.793604: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-01-22 13:14:53.807456: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-01-22 13:14:53.811664: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-01-22 13:14:53.822379: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [8]:
def evaluate_explainability_robustness(model, instance, explain_func, perturbation_scale=0.01, num_perturbations=50):
    """
    Evaluate the robustness of an explainability vector for a given instance and model.

    Parameters:
    - model: torch.nn.Module
        The PyTorch model to be evaluated.
    - instance: torch.Tensor
        The input instance for which the explainability vector is computed (1D tensor).
    - explain_func: function
        A function that takes `model` and `instance` as inputs and returns an explainability vector.
    - perturbation_scale: float
        The standard deviation of Gaussian noise added to the input for perturbations.
    - num_perturbations: int
        The number of perturbed instances to generate.

    Returns:
    - robustness_score: float
        A score indicating the robustness of the explainability vector (lower is better).
    """
    model.model.eval()  # Ensure the model is in evaluation mode
    if not isinstance(instance, torch.Tensor):
        instance = torch.tensor(instance, dtype=torch.float32)
    # Compute the original explainability vector
    original_explain_vector = torch.tensor(explain_func(instance))

    # Initialize a list to store differences in explainability vectors
    explain_diff_magnitudes = []

    for _ in range(num_perturbations):
        # Generate a perturbed instance by adding Gaussian noise
        perturbation = torch.randn_like(instance) * perturbation_scale
        perturbed_instance = instance + perturbation

        # Compute the explainability vector for the perturbed instance
        perturbed_explain_vector = torch.tensor(explain_func(perturbed_instance))
        if len(perturbed_explain_vector.shape) == 1:
            perturbed_explain_vector = perturbed_explain_vector.unsqueeze(0)
        if len(original_explain_vector.shape) == 1:
            original_explain_vector = original_explain_vector.unsqueeze(0)
        # Compute cosine similarity between the original and perturbed explainability vectors
        similarity = torch.nn.functional.cosine_similarity(original_explain_vector, perturbed_explain_vector).numpy()
        
        
        explain_diff_magnitudes.append(similarity)

    # Compute the average magnitude of differences as the robustness score
    robustness_score = np.mean(explain_diff_magnitudes)

    return robustness_score


In [9]:
def compute_mean_robustness(experiment_path, seeds, perturbation_scale=0.01, num_perturbations=50, n_samples= 200):
    """
    Compute the mean robustness score for a set of experiments.

    Parameters:
    - experiment_path: str
        The path to the directory containing the experiment results.
    - seeds: list
        A list of seeds for which to compute the mean robustness score.
    - perturbation_scale: float
        The standard deviation of Gaussian noise added to the input for perturbations.
    - num_perturbations: int
        The number of perturbed instances to generate.

    Returns:
    - mean_robustness: float
        The mean robustness score across all seeds.
    """
    robustness_scores = []

    for seed in seeds:
        print('Processing seed:', seed)
        # Load the experiment results
        seed_experiment_path = Path(f"{experiment_path}_seed_{seed}")
        seed_experiment_config_path = Path(f"{seed_experiment_path}/experiment_config.yaml")
        model_path = f"{seed_experiment_path}/model.pth"
            
        with initialize(config_path=str(seed_experiment_path), version_base=None):
            cfg = compose(config_name=seed_experiment_config_path.name)
        cfg.dataset.dataset_path = "../" + cfg.dataset.dataset_path
        dataset = get_dataset(cfg)
        X = dataset['X_test']
        y = dataset['y_test']
        explanation = dataset['explanation_test']

        # keep only data with label != 0
        X = X[y != 0]
        explanation = explanation[y != 0]
        y = y[y != 0]

        # Select 200 random instances from the dataset
        random_indices = np.random.choice(len(X), n_samples, replace=False)
        instances = X[random_indices]
        explanations = explanation[random_indices]
        labels = y[random_indices]
        model = select_model(cfg.model, device="cuda:0" if torch.cuda.is_available() else "cpu")
        model.load_model(model_path, X)
        seed_shap_robustness_scores = []
        seed_ours_robustness_scores = []
        seed_gradient_robustness_scores = []
        explainer = ShapExplainer(model, X)
        for instance in tqdm(instances):
            shap_robustness = evaluate_explainability_robustness(model, instance, explainer.explain_instance, perturbation_scale=perturbation_scale, num_perturbations=num_perturbations)
            ours_robustness = evaluate_explainability_robustness(model, instance, model.instance_explanation, perturbation_scale=perturbation_scale, num_perturbations=num_perturbations)
            gradient_robustness = evaluate_explainability_robustness(model, instance, model.gradient_explanation, perturbation_scale=perturbation_scale, num_perturbations=num_perturbations)
            
            seed_shap_robustness_scores.append(shap_robustness)
            seed_ours_robustness_scores.append(ours_robustness)
            seed_gradient_robustness_scores.append(gradient_robustness)

        shap_robustness_scores = np.array(seed_shap_robustness_scores)
        ours_robustness_scores = np.array(seed_ours_robustness_scores)
        gradient_robustness_scores = np.array(seed_gradient_robustness_scores)

        shap_robustness = np.mean(shap_robustness_scores)
        ours_robustness = np.mean(ours_robustness_scores)
        gradient_robustness = np.mean(gradient_robustness_scores)
    
        robustness_scores.append({
            "seed": seed,
            "shap": shap_robustness,
            "ours": ours_robustness,
            "gradient": gradient_robustness
        })
    robustness_scores = pd.DataFrame(robustness_scores)
    return robustness_scores

In [10]:
compute_mean_robustness(experiment_path= "../results/all_db_all_training/DTEC_DSIL_deterministic_exponential_s0_T400_bins7/A_synthetic_f10_s5000_c4_r0.0166_0.0166_0.0166", seeds=[0,1,2,3,4], perturbation_scale=0.01, num_perturbations=5, n_samples= 50)

Processing seed: 0
{'Samples': 5000, 'Features': 10, 'Anomalies': 249, 'Anomalies Ratio(%)': 4.98}


  self.model.load_state_dict(torch.load(path))


  0%|          | 0/50 [00:00<?, ?it/s]

  torch.tensor(x[:, i]),
  original_explain_vector = torch.tensor(explain_func(instance))
  perturbed_explain_vector = torch.tensor(explain_func(perturbed_instance))
  torch.tensor(x[:, i]),


Processing seed: 1
{'Samples': 5000, 'Features': 10, 'Anomalies': 249, 'Anomalies Ratio(%)': 4.98}


  self.model.load_state_dict(torch.load(path))


  0%|          | 0/50 [00:00<?, ?it/s]

  torch.tensor(x[:, i]),
  original_explain_vector = torch.tensor(explain_func(instance))
  perturbed_explain_vector = torch.tensor(explain_func(perturbed_instance))


Processing seed: 2
{'Samples': 5000, 'Features': 10, 'Anomalies': 249, 'Anomalies Ratio(%)': 4.98}


  self.model.load_state_dict(torch.load(path))


  0%|          | 0/50 [00:00<?, ?it/s]

  torch.tensor(x[:, i]),
  original_explain_vector = torch.tensor(explain_func(instance))
  perturbed_explain_vector = torch.tensor(explain_func(perturbed_instance))


Processing seed: 3
{'Samples': 5000, 'Features': 10, 'Anomalies': 249, 'Anomalies Ratio(%)': 4.98}


  self.model.load_state_dict(torch.load(path))


  0%|          | 0/50 [00:00<?, ?it/s]

  torch.tensor(x[:, i]),
  original_explain_vector = torch.tensor(explain_func(instance))
  perturbed_explain_vector = torch.tensor(explain_func(perturbed_instance))


Processing seed: 4
{'Samples': 5000, 'Features': 10, 'Anomalies': 249, 'Anomalies Ratio(%)': 4.98}


  self.model.load_state_dict(torch.load(path))


  0%|          | 0/50 [00:00<?, ?it/s]

  torch.tensor(x[:, i]),
  original_explain_vector = torch.tensor(explain_func(instance))
  perturbed_explain_vector = torch.tensor(explain_func(perturbed_instance))


Unnamed: 0,seed,shap,ours,gradient
0,0,0.992,0.978009,0.998978
1,1,0.9928,0.977579,0.998953
2,2,0.9928,0.977434,0.999463
3,3,0.992,0.975731,0.999455
4,4,0.9992,0.975502,0.999015


In [5]:
robustness_results = _

In [2]:

def dcg_score_matrix_p(importance_scores, relevance_matrix, p):
    """
    Compute the DCG scores at a given cutoff rank p.
    """
    importance_scores = np.array(importance_scores)
    relevance_matrix = np.array(relevance_matrix)
    importance_scores = importance_scores.squeeze()
    relevance_matrix = relevance_matrix.squeeze()
    assert (
        importance_scores.shape == relevance_matrix.shape
    ), "importance_scores and relevance_matrix must have the same shape"

    # Sort relevance based on importance scores
    if len(importance_scores.shape) == 1:
        importance_scores = importance_scores.reshape(1, -1)
        relevance_matrix = relevance_matrix.reshape(1, -1)

    sorted_indices = np.argsort(importance_scores, axis=1)[:, ::-1]
    sorted_relevance = np.take_along_axis(
        relevance_matrix, sorted_indices, axis=1
    )

    # Consider only the top p items
    sorted_relevance_p = sorted_relevance[:, :p]
    ranks = np.arange(1, p + 1)  # Ranks from 1 to p

    # Compute DCG scores
    dcg_scores = np.sum(sorted_relevance_p / np.log2(ranks + 1), axis=1)

    return dcg_scores


def idcg_score_matrix_p(relevance_matrix, p):
    """
    Compute the IDCG scores at a given cutoff rank p.
    """
    if len(relevance_matrix.shape) == 1:
        relevance_matrix = relevance_matrix.reshape(1, -1)
    relevance_matrix = np.array(relevance_matrix)
    sorted_relevance = np.sort(relevance_matrix, axis=1)[:, ::-1]

    # Consider only the top p items
    sorted_relevance_p = sorted_relevance[:, :p]
    ranks = np.arange(1, p + 1)  # Ranks from 1 to p

    # Compute IDCG scores
    idcg_scores = np.sum(sorted_relevance_p / np.log2(ranks + 1), axis=1)

    return idcg_scores


def nDCG_(importance_scores, relevance_matrix, p):
    """
    Compute the nDCG scores at a given cutoff rank p.
    """
    dcg_scores_p = dcg_score_matrix_p(importance_scores, relevance_matrix, p)
    idcg_scores_p = idcg_score_matrix_p(relevance_matrix, p)

    # Compute normalized DCG
    ndcg_scores_p = np.zeros_like(dcg_scores_p)
    for i in range(len(dcg_scores_p)):
        if idcg_scores_p[i] == 0:
            ndcg_scores_p[i] = 0
        else:
            ndcg_scores_p[i] = dcg_scores_p[i] / idcg_scores_p[i]

    return ndcg_scores_p


def nDCG_p(importance_scores, relevance_matrix, k= 'auto'):
    nDCG_scores = []
    if len(importance_scores.shape) == 1:
        importance_scores = importance_scores.reshape(1, -1)
    if len(relevance_matrix.shape) == 1:
        relevance_matrix = relevance_matrix.reshape(1, -1)
    for i in range(importance_scores.shape[0]):
        if k == "auto":
            k_ = int(np.sum(relevance_matrix[i]))
        else:
            k_ = k
        if k_ == 0 or int(np.sum(relevance_matrix[i])) == 0:
            continue
        nDCG_scores.append(nDCG_(importance_scores[i], relevance_matrix[i], p=k_))
    return np.array(nDCG_scores)


In [20]:
def explanation_accuracy(ground_truth, explanation, k="auto"):
    if explanation.shape != ground_truth.shape:
        raise ValueError(
            "The explanation and ground truth must have the same shape."
        )
    if len(explanation.shape) == 1:
        explanation = explanation.reshape(1, -1)
    if len(ground_truth.shape) == 1:
        ground_truth = ground_truth.reshape(1, -1)
    if type(explanation) is torch.Tensor:
        explanation = explanation.cpu().detach().numpy()
    if type(ground_truth) is torch.Tensor:
        ground_truth = ground_truth.cpu().detach().numpy()
    accuracy = []
    for row in range(ground_truth.shape[0]):
        if k == "auto":
            k_ = int(np.sum(ground_truth[row]))
        else:
            k_ = k
        if k_ == 0 or int(np.sum(ground_truth[row])) == 0:
            continue
        sorted_indices = np.argsort(explanation[row])[::-1]
        instance_explanation = np.zeros_like(explanation[row])
        instance_explanation[sorted_indices[:k_]] = 1

        instance_accuracy = (
            np.sum(ground_truth[row] * instance_explanation) / k_
        )
        accuracy.append(instance_accuracy)
    return np.mean(accuracy)

In [48]:
def explain_instance_set(model, instances, ground_truths):
    """
    Compute the explainability vectors for a set of instances.

    Parameters:
    - model: torch.nn.Module
        The PyTorch model to be evaluated.
    - instances: torch.Tensor
        The input instances for which the explainability vectors are computed (2D tensor).
    - explain_func: function
        A function that takes `model` and `instance` as inputs and returns an explainability vector.

    Returns:
    - explain_vectors: torch.Tensor
        The explainability vectors for the input instances.
    """
    model.model.eval()  # Ensure the model is in evaluation mode
    nDCGs={"shap": [], "max_diffusion": [], "mean_diffusion": [], "gradient": []}
    accuracy={"shap": [], "max_diffusion": [], "mean_diffusion": [], "gradient": []}
    shap_explainer = ShapExplainer(model, instances)
    shap_explanation = shap_explainer.explain_instance(instances).squeeze()
    max_diffusion_explanation = model.instance_explanation(instances, step=10, agg="max")
    mean_diffusion_explanation = model.instance_explanation(instances, step=10, agg="mean")
    gradient_explanation = model.gradient_explanation(instances)

    # Compute nDCG and accuracy for each method
    for i in range(ground_truths.shape[0]):
        shap_ndcg = nDCG_p(shap_explanation[i,:], ground_truths[i,:])
        max_diffusion_ndcg = nDCG_p(max_diffusion_explanation[i,:], ground_truths[i,:])
        mean_diffusion_ndcg = nDCG_p(mean_diffusion_explanation[i,:], ground_truths[i,:])
        gradient_ndcg = nDCG_p(gradient_explanation[i,:], ground_truths[i,:])
        shap_accuracy = explanation_accuracy(ground_truths[i,:], shap_explanation[i,:])
        max_diffusion_accuracy = explanation_accuracy(ground_truths[i,:], max_diffusion_explanation[i,:])
        mean_diffusion_accuracy = explanation_accuracy(ground_truths[i,:], mean_diffusion_explanation[i,:])
        gradient_accuracy = explanation_accuracy(ground_truths[i,:], gradient_explanation[i,:])

        nDCGs["shap"].append(shap_ndcg[0][0])
        nDCGs["max_diffusion"].append(max_diffusion_ndcg[0][0])
        nDCGs["mean_diffusion"].append(mean_diffusion_ndcg[0][0])
        nDCGs["gradient"].append(gradient_ndcg[0][0])
        accuracy["shap"].append(shap_accuracy)
        accuracy["max_diffusion"].append(max_diffusion_accuracy)
        accuracy["mean_diffusion"].append(mean_diffusion_accuracy)
        accuracy["gradient"].append(gradient_accuracy)

    return nDCGs, accuracy

In [58]:
dsil_exp_path = "../results/all_db_all_training/DTEC_DSIL_deterministic_0.5_s0_T400_bins7/A_synthetic_f10_s5000_c4_r0.0166_0.0166_0.0166_seed_3"
unsup_exp_path = "../results/all_db_all_training/DTEC_unsupervised_None_s0_T400_bins7/A_synthetic_f10_s5000_c4_r0.0166_0.0166_0.0166_seed_3"

In [59]:
def load_model_and_dataset_from_path(experiment_path, n_samples=200):
    cfg_experiment_path = Path(f"{experiment_path}/experiment_config.yaml")
    model_path = f"{experiment_path}/model.pth"
        
    with initialize(config_path=str(experiment_path), version_base=None):
        cfg = compose(config_name=cfg_experiment_path.name)
    cfg.dataset.dataset_path = "../" + cfg.dataset.dataset_path
    dataset = get_dataset(cfg)
    X = dataset['X_test']
    y = dataset['y_test']
    explanation = dataset['explanation_test']

    # keep only data with label != 0
    X = X[y != 0]
    explanation = explanation[y != 0]
    y = y[y != 0]

    # Select 200 random instances from the dataset
    random_indices = np.random.choice(len(X), n_samples, replace=False)
    instances = X[random_indices]
    explanations = explanation[random_indices]
    labels = y[random_indices]
    model = select_model(cfg.model, device="cuda:0" if torch.cuda.is_available() else "cpu")
    model.load_model(model_path, X)
    return model, instances, explanations, labels

In [60]:
dsil_model, dsil_instances, dsil_explanations, dsil_labels = load_model_and_dataset_from_path(dsil_exp_path)
unsup_model, unsup_instances, unsup_explanations, unsup_labels = load_model_and_dataset_from_path(unsup_exp_path)


{'Samples': 5000, 'Features': 10, 'Anomalies': 249, 'Anomalies Ratio(%)': 4.98}


  self.model.load_state_dict(torch.load(path))


{'Samples': 5000, 'Features': 10, 'Anomalies': 249, 'Anomalies Ratio(%)': 4.98}


  self.model.load_state_dict(torch.load(path))


In [61]:
dsil_explanability = explain_instance_set(dsil_model, dsil_instances, dsil_explanations)
unsup_explanability = explain_instance_set(unsup_model, dsil_instances, dsil_explanations)

In [62]:
dsil_ndcg, dsil_accuracy = dsil_explanability
unsup_ndcg, unsup_accuracy = unsup_explanability

In [63]:
# Compute mean for every key
dsil_accuracy_mean = {key: np.mean(value) for key, value in dsil_accuracy.items()}
unsup_accuracy_mean = {key: np.mean(value) for key, value in unsup_accuracy.items()}
dsil_ndcg_mean = {key: np.mean(value) for key, value in dsil_ndcg.items()}
unsup_ndcg_mean = {key: np.mean(value) for key, value in unsup_ndcg.items()}

In [64]:
dsil_accuracy_mean, unsup_accuracy_mean, dsil_ndcg_mean, unsup_ndcg_mean

({'shap': 0.665,
  'max_diffusion': 0.5833333333333333,
  'mean_diffusion': 0.6575,
  'gradient': 0.6},
 {'shap': 0.64,
  'max_diffusion': 0.5883333333333334,
  'mean_diffusion': 0.6266666666666666,
  'gradient': 0.6025},
 {'shap': 0.6891769500834437,
  'max_diffusion': 0.6108671933466101,
  'mean_diffusion': 0.6981945384400114,
  'gradient': 0.6281440035760606},
 {'shap': 0.6763007900199153,
  'max_diffusion': 0.6100011463888363,
  'mean_diffusion': 0.6599049766287269,
  'gradient': 0.6300504719814615})