# Extracting Traces with New Splits and Models

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
import os
#/content/drive/MyDrive/split_0.npz

npz_file_path = os.path.join('/content/drive/MyDrive', 'test_data.npz')

# Check if the file exists
if os.path.exists(npz_file_path):
else:
    print(f"File does not exist: {npz_file_path}")

# Try to load the .npz file and inspect its content
try:
    with np.load(npz_file_path) as data:
        print(f"Keys in the file: {data.files}")

        # Print the shape of each array inside the .npz file
        for key in data.files:
            print(f"{key} shape: {data[key].shape}")
            print(f"First element of {key}: {data[key][0]}")
except Exception as e:
    print(f"Error loading")


File exists: /content/drive/MyDrive/test_data.npz
Keys in the file: ['X_test', 'y_test']
X_test shape: (7841, 50, 50, 3)
First element of X_test: [[[0.14509805 0.14901961 0.14117648]
  [0.14509805 0.14901961 0.14117648]
  [0.16862746 0.16470589 0.15294118]
  ...
  [0.16470589 0.16862746 0.15294118]
  [0.16078432 0.16470589 0.14901961]
  [0.16078432 0.16470589 0.14901961]]

 [[0.14509805 0.14901961 0.14117648]
  [0.14509805 0.14901961 0.14117648]
  [0.16862746 0.16470589 0.15294118]
  ...
  [0.16470589 0.16862746 0.15294118]
  [0.16078432 0.16470589 0.14901961]
  [0.16078432 0.16470589 0.14901961]]

 [[0.14901961 0.14901961 0.14117648]
  [0.14901961 0.14901961 0.14117648]
  [0.15294118 0.15294118 0.14509805]
  ...
  [0.15294118 0.15294118 0.13725491]
  [0.15294118 0.15294118 0.13725491]
  [0.15294118 0.15294118 0.13725491]]

 ...

 [[0.14901961 0.14901961 0.14901961]
  [0.14901961 0.14901961 0.14901961]
  [0.15686275 0.15686275 0.15686275]
  ...
  [0.15686275 0.16862746 0.16862746]
  [0

In [None]:
# ACTIVATION TRACE EXTRACTION
import os
import numpy as np
import tf_keras as k
from multiprocessing import Pool

# Constants
BATCH_SIZE = 128
MODEL_DIR = '/content/drive/MyDrive'  # Adjust to your model directory
ACTIVATION_TRACES_DIR = 'activation_traces'
PREDICTIONS_DIR = 'predictions'
SPLIT_DIR = '/content/drive/MyDrive'  # Adjust based on your data structure
os.makedirs(ACTIVATION_TRACES_DIR, exist_ok=True)
os.makedirs(PREDICTIONS_DIR, exist_ok=True)

# Load test data
with np.load(os.path.join(SPLIT_DIR, 'test_data.npz')) as data:
    X_test = data['X_test']
    y_test = data['y_test']

# File paths for saving activation traces and predictions
def get_acts_filepath(split, iteration=0):
    return os.path.join(ACTIVATION_TRACES_DIR, f"layer_split_{split}_acts_iter_{iteration}.npy")

def get_preds_filepath(split, iteration=0):
    return os.path.join(PREDICTIONS_DIR, f"layer_split_{split}_preds_iter_{iteration}.npy")

# Extract activation traces with flexibility for dense and convolutional layers
def get_act_traces(model, input_set, layer_names, batch_size, num_classes):
    temp_model = k.models.Model(inputs=model.input, outputs=[model.get_layer(layer_name).output for layer_name in layer_names])
    layer_outputs = temp_model.predict(input_set, batch_size=batch_size, verbose=1)
    layer_outputs = layer_outputs if len(layer_names) > 1 else [layer_outputs]

    act_traces = None
    for layer_name, layer_output in zip(layer_names, layer_outputs):
        if layer_output.ndim == 4:  # For convolutional layers
            p = Pool(num_classes)
            conv_outputs = [layer_output[i] for i in range(len(input_set))]
            layer_matrix = np.array(p.map(__aggregate_layer, conv_outputs))
            p.join()
        elif layer_output.ndim == 2:  # For fully connected (Dense) layers
            layer_matrix = np.array(layer_output)
        else:
            raise Exception(f"Unsupported output shape found: {layer_output.ndim} for layer {layer_name}")

        act_traces = layer_matrix if act_traces is None else np.append(act_traces, layer_matrix, axis=1)

    return act_traces

# Aggregate convolutional layer outputs (used if extracting from conv layers)
def __aggregate_layer(layer_output):
    return [np.mean(layer_output[..., j]) for j in range(layer_output.shape[-1])]


def extract_prediction_array(model, input_set, y_true):
    # Ensure y_true is in integer label format, not one-hot
    y_true = np.argmax(y_true, axis=1)  # Convert from one-hot encoding to integer labels
    raw_preds = model.predict(input_set)
    pred_labels = np.argmax(raw_preds, axis=1)
    pred_confs = np.max(raw_preds, axis=1)
    return np.vstack((y_true, pred_labels, pred_confs)).T


# Process and save activation traces and predictions
def process_and_save(model, split, iteration):
    if split == 'test':
        X_data = X_test
        y_data = y_test
    else:
        split_file_path = os.path.join(SPLIT_DIR, f'split_{iteration}.npz')
        with np.load(split_file_path) as data:
            X_data = data[f'X_{split}']
            y_data = data[f'y_{split}']

    # Set correct layer names depending on the iteration (model)
    if iteration == 0:
        LAYER_NAMES = ['dense_1'] #dense_1
    elif iteration == 1:
        LAYER_NAMES = ['dense_3'] #dense_3
    elif iteration == 2:
        LAYER_NAMES = ['dense_5'] #dense_5

    activations = get_act_traces(model, input_set=X_data, layer_names=LAYER_NAMES, batch_size=BATCH_SIZE, num_classes=43)
    predictions = extract_prediction_array(model, input_set=X_data, y_true=y_data)
    np.save(get_acts_filepath(split, iteration=iteration), activations)
    np.save(get_preds_filepath(split, iteration=iteration), predictions)

# Iterate through all saved models and process activation traces and predictions
for i in range(3):  # Now processing only 3 models/splits
    model_path = os.path.join(MODEL_DIR, f'vgg16model_split_{i}.keras')
    print(f"Loading model from {model_path}")
    model = k.models.load_model(model_path)
    print(f"Model {i} loaded successfully")

    # Modify the final dense layer by removing softmax and adding a standalone softmax layer
    # First, remove the softmax activation from the final dense layer
    model.layers[-1].activation = None  # Deactivate the softmax in the Dense layer

    # Then, add a separate softmax layer
    from tf_keras.layers import Softmax
    model.add(Softmax(name='softmax_output'))

    # Print model summary (optional)
    model.summary()

    for split in ['train', 'test']:  # Process both train and test splits
        process_and_save(model, split, iteration=i)
        print(f'Processed and saved activations and predictions for model {i} and split {split}')

print('Activation trace and prediction extraction complete.')


Loading model from /content/drive/MyDrive/vgg16model_split_0.keras
Model 0 loaded successfully
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 vgg16 (Functional)          (None, 1, 1, 512)         14714688  
                                                                 
 flatten (Flatten)           (None, 512)               0         
                                                                 
 dense (Dense)               (None, 512)               262656    
                                                                 
 dropout (Dropout)           (None, 512)               0         
                                                                 
 dense_1 (Dense)             (None, 43)                22059     
                                                                 
 softmax_output (Softmax)    (None, 43)                0         
                           

In [None]:
# DEBUG MDSA CALC
import os
import numpy as np
import pandas as pd
import scipy as sp
import math
from tqdm import tqdm
import tf_keras as k
import os

# Constants
SURPRISE_TYPES = ['MDSA']
ACTIVATION_TRACES_DIR = 'activation_traces'
PREDICTIONS_DIR = 'predictions'
MDSA_SCORES_DIR = 'mdsa_scores'
os.makedirs(MDSA_SCORES_DIR, exist_ok=True)

def calculate_mdsa(train_acts, train_labels, train_class_count, target_acts, target_preds):
    """Compute the Mahalanobis Distance-based Surprise Adequacy (MDSA) score."""
    NEURON_COUNT = len(train_acts[0])
    NEURON_COLS = ["n" + str(i) for i in range(NEURON_COUNT)]

    mdsa_list = []
    means_vectors, inv_cov_list = [], []

    # Step 1: Calculate mean / inverted covariance matrix for each class
    train_df = pd.DataFrame(columns=["Ground_Truth"] + NEURON_COLS)
    train_df["Ground_Truth"] = train_labels
    for i in range(NEURON_COUNT):
        train_df["n" + str(i)] = train_acts[:, i]

    for c in range(train_class_count):
        subset_df = train_df[train_df["Ground_Truth"] == c]
        means_vectors.append(subset_df[NEURON_COLS].mean(axis=0).values)
        cov = np.cov(subset_df[NEURON_COLS].values.T)

        inv_cov_list.append(sp.linalg.inv(cov))


    # Step 2: Calculate MDSA scores
    print("Calculating Mahalanobis Distance-based Surprise Adequacy scores...")
    for i in tqdm(range(len(target_acts))):
        alpha = target_acts[i]
        gt_class = target_preds[i].astype(int)

        means_vector = means_vectors[gt_class]
        inv_cov = inv_cov_list[gt_class]

        alpha_means_transpose = np.transpose(alpha - means_vector)

        # Calculate the value and print the intermediate steps for debugging
        mdsa_value = np.dot(np.dot(alpha_means_transpose, inv_cov), (alpha - means_vector))


        # Check if mdsa_value is negative
        if mdsa_value < 0:
            print("Warning: Negative MDSA value detected.")
            print(f"mdsa_value: {mdsa_value}")

        # Now calculate the MDSA score using the sqrt of the value
        mdsa = math.sqrt(mdsa_value)  # This is where the math domain error happens
        #mdsa = math.sqrt(max(mdsa_value, 0))

        mdsa_list.append(mdsa)

    return mdsa_list

def get_surprise_scores(surprise_type, train_acts, train_labels, train_class_count, target_acts, target_preds):
    if surprise_type == 'MDSA':
        return calculate_mdsa(train_acts, train_labels, train_class_count, target_acts, target_preds)
    else:
        raise Exception("Unknown surprise type: " + str(surprise_type))


def extract_parallel_surprise_scores(i: int):
    train_acts = np.load(os.path.join(ACTIVATION_TRACES_DIR, f'layer_split_train_acts_iter_{i}.npy'))
    train_preds = np.load(os.path.join(PREDICTIONS_DIR, f'layer_split_train_preds_iter_{i}.npy'))

    # Access the ground truth labels and predicted labels from columns
    train_labels = train_preds[:, 0]  # Ground truth labels are in the first column
    train_class_count = len(np.unique(train_labels))

    # Process both training and test splits for surprise scores
    for data_split in ['train', 'test']:
        target_acts = np.load(os.path.join(ACTIVATION_TRACES_DIR, f'layer_split_{data_split}_acts_iter_{i}.npy'))
        target_preds_data = np.load(os.path.join(PREDICTIONS_DIR, f'layer_split_{data_split}_preds_iter_{i}.npy'))
        target_preds = target_preds_data[:, 1]  # Predicted labels are in the second column

        for surprise_type in SURPRISE_TYPES:
            scores = get_surprise_scores(surprise_type, train_acts, train_labels, train_class_count, target_acts, target_preds)
            output_filepath = os.path.join(MDSA_SCORES_DIR, f'layer_split_{data_split}_mdsa_scores_iter_{i}.npy')
            np.save(output_filepath, scores)
            print(surprise_type, "output saved to file:", output_filepath)


# Run MDSA calculation for all models
for i in range(3):  # Adjust the range if necessary for the number of splits you have
    extract_parallel_surprise_scores(i)

print('MDSA score extraction complete.')


Calculating Mahalanobis Distance-based Surprise Adequacy scores...


100%|██████████| 25094/25094 [00:00<00:00, 132503.79it/s]


MDSA output saved to file: mdsa_scores/layer_split_train_mdsa_scores_iter_0.npy
Calculating Mahalanobis Distance-based Surprise Adequacy scores...


100%|██████████| 7841/7841 [00:00<00:00, 118360.53it/s]

MDSA output saved to file: mdsa_scores/layer_split_test_mdsa_scores_iter_0.npy





Calculating Mahalanobis Distance-based Surprise Adequacy scores...


100%|██████████| 25094/25094 [00:00<00:00, 131285.52it/s]

MDSA output saved to file: mdsa_scores/layer_split_train_mdsa_scores_iter_1.npy





Calculating Mahalanobis Distance-based Surprise Adequacy scores...


100%|██████████| 7841/7841 [00:00<00:00, 132730.39it/s]

MDSA output saved to file: mdsa_scores/layer_split_test_mdsa_scores_iter_1.npy





Calculating Mahalanobis Distance-based Surprise Adequacy scores...


100%|██████████| 25094/25094 [00:00<00:00, 132134.33it/s]

MDSA output saved to file: mdsa_scores/layer_split_train_mdsa_scores_iter_2.npy





Calculating Mahalanobis Distance-based Surprise Adequacy scores...


100%|██████████| 7841/7841 [00:00<00:00, 129262.21it/s]

MDSA output saved to file: mdsa_scores/layer_split_test_mdsa_scores_iter_2.npy
MDSA score extraction complete.





In [None]:
import numpy as np


split = 'train'
iteration = 0


mdsa_scores_path = f'mdsa_scores/layer_split_{split}_mdsa_scores_iter_{iteration}.npy'
mdsa_scores = np.load(mdsa_scores_path)

# View the first few MDSA scores
print("First 10 MDSA scores:", mdsa_scores[:10])

# Check the shape of the MDSA scores
print("Shape of MDSA scores:", mdsa_scores.shape)

# Summary statistics (optional)
print(f"Mean: {np.mean(mdsa_scores)}")
print(f"Standard Deviation: {np.std(mdsa_scores)}")
print(f"Min: {np.min(mdsa_scores)}")
print(f"Max: {np.max(mdsa_scores)}")


First 10 MDSA scores: [ 4.18336998  6.89801051  4.63658183  5.88717019  3.9576132   5.11801757
 22.0546794   5.67678852  5.72169485  4.02842221]
Shape of MDSA scores: (25094,)
Mean: 6.820249849919589
Standard Deviation: 16.078938470579008
Min: 2.1260977216137364
Max: 916.3105087920226
