# Jupyter Notebook of test_runner


In [1]:
import numpy as np
from model_loader import load_pretrained_model, load_dataloader
from augmentations import DataAugmentationTesting
from embeddings_similarity_module import EmbeddingSimilarityModule
#from cam_module import ClassActivationMapping
from collections import defaultdict
import logging
import random
import json
from tqdm import tqdm
import pickle
import os
import torch
import torch.nn.functional as F


# Create a logger object
logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)  # Set the minimum level of messages to log

# Create a file handler that logs debug and higher level messages
file_handler = logging.FileHandler('example.log', mode='w')
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)

# Create a console handler that logs debug and higher level messages
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(console_formatter)

# Add both handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)

def load_config(config_path):
    with open(config_path, 'r') as config_file:
        return json.load(config_file)

def save_test_results(test_results, config):
    # Fetch the filename from the config dictionary
    filename = "/test_results/"+config["test_name"] + '.pkl'  # Adding .pkl extension to the file name

    # Open the file in binary write mode and save the test_results using pickle
    with open(filename, 'wb') as file:
        pickle.dump(test_results, file)

    print(f"Test results saved to {filename}")

def select_test_idxs(dataloader, num_instances_per_class, all_class_labels):
    # Initialize storage for selected indices for each class
    class_indices = defaultdict(list)
    
    # Initialize counters for each class
    collected = {label: 0 for label in all_class_labels}  # Ensures we track per label in all_class_labels
    
    # Track selected indices to avoid duplicates
    selected_indices = set()

    # Calculate the total number of instances needed
    total_needed = num_instances_per_class * len(all_class_labels)

    # Iterate until the required number of indices per class is collected
    while sum(collected.values()) < total_needed:
        idx = random.randint(0, len(dataloader.dataset) - 1)
        if idx in selected_indices:
            continue
        
        # Fetch the label, assuming it is accessible and in correct format directly
        _, label = dataloader.dataset[idx]
        
        # Check if more instances are needed for this class
        if collected[label] < num_instances_per_class:
            class_indices[label].append(idx)
            selected_indices.add(idx)
            collected[label] += 1

        # Verify if all quotas are met to potentially break the loop
        if all(count == num_instances_per_class for count in collected.values()):
            break

    # Flatten the dictionary into a list of indices
    test_idxs = [idx for indices in class_indices.values() for idx in indices]

    return test_idxs


def print_summary(test_results):
    """
    Print summary statistics for test results including accuracy,
    precision, recall, and counts of TP, TN, FP, FN for multi-class classification.
    """
    # Initialize counters and accumulators
    total_tests = len(test_results)
    num_classes = len(test_results[0]["output_aug"][0])

    correct_predictions = 0
    confusion_matrix = np.zeros((num_classes, num_classes), dtype=int)
    
    for result in test_results:
        # Convert model output to a probability distribution using softmax
        output_prob = F.softmax(torch.tensor(result["output_aug"]), dim=-1).numpy()
        prediction = np.argmax(output_prob)
        true_label = result["og_label"]
        
        # Increment correct predictions
        if prediction == true_label:
            correct_predictions += 1
        
        # Update confusion matrix
        confusion_matrix[true_label, prediction] += 1

    # Calculate accuracy
    accuracy = (correct_predictions / total_tests) * 100 if total_tests > 0 else 0

    # Calculate precision and recall for each class
    precision = np.diag(confusion_matrix) / np.sum(confusion_matrix, axis=0)
    recall = np.diag(confusion_matrix) / np.sum(confusion_matrix, axis=1)

    # Print summary
    print(f"Total number of test results: {total_tests}")
    print(f"Model Accuracy on output_aug: {accuracy:.2f}%")
    for i in range(len(precision)):
        print(f"Class {i} - Precision: {precision[i]:.4f}, Recall: {recall[i]:.4f}")

    # Optionally, print the confusion matrix
    print("Confusion Matrix:")
    print(confusion_matrix)

    

#***************************************************      Main           *************************************************************
if __name__ == "__main__":

    config_path = "config.json"
    config = load_config(config_path)
    model = load_pretrained_model(config)
    logging.info("Model Loaded")

    dataloader = load_dataloader(config)
    logging.info("Dataloader Loaded")

    all_class_labels = [0,1,2,3] # needs to match your dataloader labels
    num_instances_per_class = config["num_instances_per_class"]
    test_index_list = select_test_idxs(dataloader, num_instances_per_class,all_class_labels)
    logging.info("Test list created and Loaded")
    print(test_index_list)

    # Initialize the Data Augmentation Module
    augmentor_sandbox = DataAugmentationTesting(model, dataloader, config)
    
    # # Initialize processing modules
    embedding_similarity_module = EmbeddingSimilarityModule(model, dataloader, config)



Baseline_CNN1D(
  (norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
  (conv1): Conv1d(2, 64, kernel_size=(7,), stride=(1,))
  (relu1): ReLU()
  (maxpool1): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(64, 128, kernel_size=(3,), stride=(1,))
  (relu2): ReLU()
  (maxpool2): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (feature_extractor): ModuleList(
    (0): Conv1d(2, 64, kernel_size=(7,), stride=(1,))
    (1): ReLU()
    (2): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv1d(64, 128, kernel_size=(3,), stride=(1,))
    (4): ReLU()
    (5): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc1): Linear(in_features=16000, out_features=256, bias=True)
  (relu3): ReLU()
  (fc2): Linear(in_features=256, out_features=4, bias=True)
  (logSoftmax): LogSoftmax(dim=1)
) cpu
cpu
(array([[ 0.83196074, -0.65662307,  0.32264539, ..., -0.6

In [2]:

# Prepare to collect results
test_results = []

# Run tests on the specific range of indexes
for index in tqdm(test_index_list, desc="Processing indexes"):

    # #TODO add if statement here to check config for if awgn_aug_test
    # #Run AWGN Aug Test *************************************
    # # Augment the sample
    # augmentor_sandbox.replace_with_noise(index)

    # # Get the augmented sample
    # awgn_aug_sample, aug_label = augmentor_sandbox.awgn_aug_data.__getitem__(index)
    # og_sample, og_label = augmentor_sandbox.get_original_sample(index)

    # # Run model inference on augmented sample and original
    # output_aug = augmentor_sandbox.run_model_on_augmented_sample(awgn_aug_sample)
    # output_og = augmentor_sandbox.run_model_on_augmented_sample(og_sample)

    # # Compute embedding similarity
    # es_features_aug, es_label_aug = embedding_similarity_module.extract_features(awgn_aug_sample, aug_label)
    # es_features_og, es_label_og = embedding_similarity_module.extract_features(og_sample, og_label)
    # es_result = 0 #TODO compute distance metric for similarity
    # # embedding_similarity_module.plot_tsne(es_features, es_label)
    # # # # Compute class activation mapping
    # # cam_result = cam.compute(augmented_sample, model_output)
    # cam_result = None
    # # Store results
    # test_results.append({
    #     "test": "awgn",
    #     "index": index,
    #     "og_label": og_label,
    #     "aug_label": aug_label,
    #     "output_og": output_og,
    #     "output_aug": output_aug,
    #     "embedding_similarity": es_result,
    #     "embedding_features_og":es_features_og,
    #     "embedding_features_aug":es_features_aug,
    #     "cam_result": cam_result
    # })
    # #**************** end awgn aug datatest *****************************

    #TODO add if statement here to check config for if cfo_aug_test
    #Run CFO Aug Test *************************************
    # Augment the sample (creates new augmented cfo dataset as well)
    augmentor_sandbox.add_frequency_offset(index)

    # Get the augmented sample
    cfo_aug_sample, aug_label = augmentor_sandbox.cfo_aug_data.__getitem__(index)
    og_sample, og_label = augmentor_sandbox.get_original_sample(index)

    # Run model inference on augmented sample and original
    output_aug = augmentor_sandbox.run_model_on_augmented_sample(cfo_aug_sample)
    output_og = augmentor_sandbox.run_model_on_augmented_sample(og_sample)
    

    # Compute embedding similarity
    es_features_aug, es_label_aug = embedding_similarity_module.extract_features(cfo_aug_sample, aug_label)
    es_features_og, es_label_og = embedding_similarity_module.extract_features(og_sample, og_label)
    es_result = 0 #TODO compute distance metric for similarity
    # embedding_similarity_module.plot_tsne(es_features, es_label)

    # # Compute class activation mapping
    # cam_result = cam.compute(augmented_sample, model_output)
    cam_result = None
    # Store results
    test_results.append({
        "test": "cfo",
        "index": index,
        "og_label": og_label,
        "aug_label": aug_label,
        "output_og": output_og,
        "output_aug": output_aug,
        "embedding_similarity": es_result,
        "embedding_features_og":es_features_og,
        "embedding_features_aug":es_features_aug,
        "cam_result": cam_result
    })
    #**************** end CFO aug datatest *****************************


    
    

# # Optionally, save and summarize results
save_test_results(test_results, config)
print_summary(test_results)


Processing indexes: 100%|██████████| 4000/4000 [01:06<00:00, 59.96it/s]


PermissionError: [Errno 13] Permission denied: '/test_resultsawgn_tprime_debug.pkl'