## CNN attack on 5px Analog Linearalized Data using Single CNN

The goal of this notebook is to correctly preprocess the given data as tensors that can be used to train a single simple CNN and evaluate its results.

In [None]:
# Necessary imports
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os
import re

## Common rules
1) **The BIT numbers are arranged in descending order.**
    1) Thus, in a 8-bit ADC, **bit 7** is the **MSB** and **bit 0** is the **LSB**.
2) **The ADC numbers are arranged in descending order.**
    1) Thus, in a 5-pixel case, the ADC with **adc_num 4** stores **bits [39-32]**, and ADC with **adc_num 0** stores **bits [7-0]**.

#### Hyperparameter - File Settings

In [4]:
''' 
Hyperparameters used to define the nature of the power trace files and values used for creating the dataloaders.
Double-check if these values match the format of the power trace files; failure to do so may cause unforeseen errors which are extremely difficult to debug.
    1) adc_num: Number of ADCs to create. Default value is set to 1.
    2) adc_bandwidth: Number of bits each ADC stores. Default value is set to 8.
    3) train_batch: training dataloader batch size
    4) split_digital: BOOLEAN value. Set to 'True' if the dataloaders need the digital output values of individual ADCs.
    5) normalized_digital:BOOLEAN value. Set to 'True' if the file names provide normalized digital values.
'''
adc_num = 5
adc_bitwidth = 8
train_batch = 512
split_digital = True
normalized_digital = True

#### Hyperparameter - Trace File Directories

In [6]:
''' 
Hyperparameter used to desginate trace folders to be loaded.
All power trace files are assumed to be saved in a folder called 'trace_files' within the root directory of the notebook.
Define names of all folders within 'trace_files' to be added to the dataloaders within this hyperparameter.
    1) sub_folder: BOOLEAN value that is used when trace folders are stored in a specific
    folder within the parent directory.
    2) sub_folder_name: name of sub-folder
    ex) If trace folders are stored in /trace_folders/analog_5px, set sub_folder to True
    and sub_folder_name to 'analog_5px'
    3) train_trace_folder_names: Add names of folders used for TRAINING dataset to this array.
    4) test_trace_folder_names: Add names of folders used for TESTING dataset to this array.
    5) trace_type: string that defines the type of trace file to be used. Default is "lin" for linearized.
    6) file_pattern: RegEx of file names to be checked. Number of groups MUST match 'adc_num'
'''
sub_folder = True
sub_folder_name = 'analog_5px'
train_trace_folder_names = ['analog_5px_tt_px']
test_trace_folder_names = ['analog_5px_tt_pm']
trace_type = "lin"
file_pattern = trace_type + "_s\\d+_([0-9]+\\.[0-9]+)_([0-9]+\\.[0-9]+)_([0-9]+\\.[0-9]+)_([0-9]+\\.[0-9]+)_([0-9]+\\.[0-9]+)\\.txt"


### Multipy ALL Power Trace Values by 10^(e_exp)
Our power trace values are extremely small, and thus may potentially impact the CNN's performance negatively. Therefore, when creating tensors, we will multiply **ALL** power values with 10^(e_exp).

The default e_exp value is set to **5**, which is a value acquired by 

### Creating dataloaders

#### Trace Classes
All functions are saved in: **~/py_files/dataloader.py**
1) TraceDataset(Dataset): creates dataset of TRACE files
2) TraceDatasetBW(TraceDataset): takes a TraceDataset, labels them by bit
3) TraceDatasetBuilder: creates final Dataloader with traces and labels

Helper function for creating trace datasets:
1) process_string: Function that processes trace string values to correct format float64.


In [None]:
from py_files.file_preprocess import TraceDataset, TraceDatasetBW, TraceDatasetBuilder

#### Create separate dataloaders - Training and Testing
Create two separate dataloaders from the provided files.

Need some clarification on the specific logic for choosing files(256 per training/testing, mixing both, etc.)

There are also some prerequisits for the dataloaders:
1) all files should be the same length. If there are 2610 values in one file, all files should have 2610.
2) each dataloader should have at least one trace mapped per digital value. I am not completely sure on the impact of selecting less than 256 traces per dataloader, but based on previous attempts on using partial data, I am guessing it is necessary.
3) a good ratio between training and testing is 7:3. We will need more data to make this possible.

In [None]:
# Create dataloaders
pwd = os.getcwd()
trace_folder_dir = os.path.join(pwd, 'trace_files')
if sub_folder == True:
    trace_folder_dir = os.path.join(trace_folder_dir, sub_folder_name)

train_builder = TraceDatasetBuilder(adc_bitwidth=8, cache=True)
test_builder = TraceDatasetBuilder(adc_bitwidth=8, cache=True)

# Create TRAINING dataloader
try:
    for folder_name in train_trace_folder_names:
        train_builder.add_files(os.path.join(trace_folder_dir, folder_name), file_pattern, 0)
except FileNotFoundError:
    print(f"The folder does not exist: {folder_name}")
    print(f"Check the following settings and try again:")
    print(f"\tsub_folder: {sub_folder}")
    print(f"\tsub_folder_name: {sub_folder_name}")
    print(f"\ttrain_trace_folder_names: {train_trace_folder_names}")
    print(f"\ttest_trace_folder_names: {test_trace_folder_names}")
    print(f"\tfile_pattern: {file_pattern}")
    raise
train_builder.build()
train_builder.build_dataloaders(batch_size=train_batch, shuffle=False)
print(train_builder.dataloaders[0])
print(len(train_builder.dataloaders))
print(len(train_builder.dataloaders[0]))

# Create TESTING dataloader
try:
    for folder_name in test_trace_folder_names:
        test_builder.add_files(os.path.join(trace_folder_dir, folder_name), file_pattern, 0)
except FileNotFoundError:
    print(f"The folder does not exist: {folder_name}")
    print(f"Check the following settings and try again:")
    print(f"\tsub_folder: {sub_folder}")
    print(f"\tsub_folder_name: {sub_folder_name}")
    print(f"\ttrain_trace_folder_names: {train_trace_folder_names}")
    print(f"\ttest_trace_folder_names: {test_trace_folder_names}")
    print(f"\tfile_pattern: {file_pattern}")
    raise
test_builder.build()
test_builder.build_dataloaders(batch_size=train_batch, shuffle=False)

### Setup for ResNet
Section where the initial imports and variables for ResNet is set up.

In [None]:
# ResNet18 code
# Necessary imports
import os
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset, random_split
from torchvision.models import resnet18, ResNet18_Weights
import numpy as np
import datetime


### Optimizing CUDA version
Section where we desginate the most updated CUDA version for current GPU.

Implementation in progress, but is a lot of unnecessary work; will do if CPU training speeds are unformidable.

## Training CNNs - ResNet18
Using pretrained ResNet18, we train each CNN until all of them reaches an accuracy of 1.0.

I'm not sure if aiming for an accuracy of 1.0 is beneficial, as it is just overfitting the model to the training data. A more realistic value may be 0.99, but will set it to 1.0 for the current simulated environment.

The training function automatically reduces the learning rate used in Adam based on target accuracy. Currently testing different values and decrease rates. 

### Hyperparameters

1) def_lr: Default learning rate. Change this value to test different learning rate values.
2) freeze_layers: If set to 'True', freezes all layers of ResNet with no additional training to those layers.

In [None]:
# Main hyperparameters
def_lr = 1e-4
freeze_layers = False

### Helper Functions

In [None]:
''' 
Function that initializes the CNN and its core components.
Inputs:
    1) learning_rate: learning rate, current default=def_lr
Returns:
    1) cnn: model used(ResNet18)
    2) criterion: loss function, current default=CrossEntropyLoss
    3) optimizer: optimizer, current default=Adam
'''
def cnn_init(learning_rate):
    # Model: ResNet18, using ResNet18_Weights.DEFAULT for up-to-date values
    cnn = resnet18(weights=ResNet18_Weights.DEFAULT)
    if freeze_layers:
        # Freeze all layers
        for param in cnn.parameters():
            param.requires_grad = False
    cnn.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
    # Resulting output is value between 0 and 1
    cnn.fc = nn.Linear(cnn.fc.in_features, 2)
    # Loss function: not specified in paper, using Cross Entropy Loss
    criterion = nn.CrossEntropyLoss()

    # Learning rate: default set to def_lr, adjust accordingly
    # Optimizer: not specified in paper, using Adam
    optimizer = optim.Adam(cnn.parameters(), lr=learning_rate)

    for module in cnn.modules():
        if isinstance(module, nn.BatchNorm2d):
            module.track_running_stats = False

    return cnn, criterion, optimizer
 

''' 
Function that initializes parameters used in training.
Inputs:
    None
Returns:
    TRAINING PARAMETERS
    1) num_epochs: number of maximum epochs per training
    2) max_grad_norm: gradient clipping threshold
'''
def param_init():
    # training parameters
    num_epochs = 1000
    max_grad_norm = 1.0
    
    return num_epochs, max_grad_norm

''' 
Function that loads variables from saved .pth file. Checks if the file is valid.
This function simply receives the checkpoint file and checks if it is valid.
Inputs:
    1) checkpoint_path: string of .pth file name 
    2) device: selected device to run training
Returns:
    1) pth_variables: dictionary that contains received .pth variables.
    List of variables are saved in 'load_values' array.
'''
def load_pth_file(checkpoint_path, device, load_values):
    pth_variables = {}
    # Load .pth file
    try:
        checkpoint = torch.load(checkpoint_path, map_location=device, weights_only=True)
        for values in load_values:
            pth_variables[values] = checkpoint.get(values, 0)
        return pth_variables
    except FileNotFoundError:
        print(f"No checkpoint found at {checkpoint_path}. Starting from scratch.")
        raise
    except KeyError as e:
        missing_keys = {
            'cnn_state_dict': "CNN state dictionary",
            'optimizer_state_dict': "optimizer state dictionary",
            'epoch': "epoch",
            'reached_acc': "recorded accuracy of current epoch"
        }
        key = e.args[0]
        if key in missing_keys:
            if key in ['cnn_state_dict', 'optimizer_state_dict', 'epoch', 'reached_acc']:
                print(f"Critical error: Missing \"{key}\" that stores \"{missing_keys[key]}\". Checkpoint file is invalid.")
                raise

        print(f"Unknown parameter \"{key}\" in checkpoint. Recheck file and try again.")
        raise

In [None]:
cnns = []
train_dataloaders = train_builder.dataloaders 
timestamp   = datetime.datetime.now().strftime('%Y%m%d_%H%M')

print(f"Installed CUDA version: {torch.version.cuda}\n")

# Create CNN per dataset
for i in range(adc_num-1, -1, -1):
    for j in range(adc_bitwidth-1, -1, -1):
        print(f"Starting training for \"bit_{i*8+j}\"...")

        # initialize CNN
        learning_rate = def_lr
        cnn, criterion, optimizer = cnn_init(learning_rate)
        # Print LR
        print(f"Learning rate:{learning_rate}")
        # initialize parameters
        num_epochs, max_grad_norm = param_init()

        # Set device to cuda if available
        if torch.cuda.is_available():
            print("\tGPU found, running training on GPU...")
            device = torch.device("cuda")
            cnn = cnn.to(device)
        else:
            print("\tNo GPU found, running training on CPU...")
            print("\tRecheck CUDA version and if your GPU supports it.")
            device = torch.device("cpu")

        # Append CNN to cnns array
        cnns.append(cnn)

        # Create checkpoint to save progress
        checkpoint_path = f"resnet18_checkpoint_adc_{i}_bit_{j}.pth"

        # Attempt to load saved .pth file
        # start_epoch = LOCAL variable that specifies starting epoch number
        # prev_acc = LOCAL variable that specifies accuracy achieved by loaded .pth file
        load_values = ['cnn_state_dict', 'optimizer_state_dict', 'epoch', 'reached_acc']
        try:
            pth_vars = load_pth_file(checkpoint_path, device, load_values)
        except FileNotFoundError:
            start_epoch = 0
            prev_acc = 0
        else:
            # Manually create variables per value
            checkpoint = torch.load(checkpoint_path, map_location=device)
            cnn.load_state_dict(checkpoint['cnn_state_dict'])
            optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
            start_epoch = checkpoint['epoch']
            print(f"Checkpoint loaded. Resuming from epoch {start_epoch}")
            prev_acc = checkpoint['reached_acc']
        
            print(f"Checkpoint loaded. Resuming from epoch {start_epoch}.")
            print(f"\tPrevious reached accuracy: {prev_acc}.")

        # Skip training if target accuracy is reached
        if prev_acc == 1:
            print(f"\tSkipping training: accuracy of 1 already achieved.\n")
            continue

        # Start training
        for e in range(start_epoch, num_epochs):
            correct = 0
            total_traces = 0
            cnn.train()
            for inputs, labels in train_dataloaders[i*8+j]:
                inputs, labels = inputs.to(device), labels.to(device)
                # Add dimensions for channels and width
                inputs = inputs.unsqueeze(1).unsqueeze(-1)          
                optimizer.zero_grad()
                output = cnn(inputs)
                # Check for NaN in outputs
                if torch.isnan(output).any():
                    print("NaN detected in cnn outputs.")
                    break

                loss = criterion(output, labels)
                # Check for NaN in loss
                if torch.isnan(loss):
                    print("NaN detected in loss. Stopping training.")
                    break
                # print(f"Loss: {loss.item()}")
                loss.backward()

                # Gradient clipping
                torch.nn.utils.clip_grad_norm_(cnn.parameters(), max_grad_norm)
                optimizer.step()
                _, predicted = torch.max(output, 1)
                correct += (predicted == labels).sum()

                total_traces += len(labels)
    
            accuracy = correct / total_traces

            # Change rate of update for printing accuracy accordingly
            if (e + 1) % 1 == 0:
                print(f'TRAINING: cnn[{i*8+j}] (adc {i}, bit{j}), Epoch {e+1}, Loss: {loss.item()}')
                print(f'TRAINING: cnn[{i*8+j}] (adc {i}, bit{j}), Epoch {e+1}, Accuracy: {accuracy}')

            # Save checkpoint
            torch.save({
                'cnn_state_dict': cnn.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'epoch': e + 1,
                'reached_acc': accuracy
            }, checkpoint_path)
            print(f"Checkpoint saved for epoch {e + 1}")

            prev_acc = accuracy

            if accuracy >= 0.925:
                print(f"Reached accuracy of 1.0. Stopping training for \"cnn[{i*8+j}] (adc {i}, bit{j})\".\n")
                break

### Testing the CNNs

Using the resulting CNNs saved in the .pth files, we test each CNNs using the provided power traces.

Currently the testing data is a subset of the training data. In the future, if we can generate more data, we will be able to use separate datasets.



#### Simple Test Accuracy

Prints single float value for testing accuracy.

In [None]:
# Create TESTING dataloaders
test_dataloaders = test_builder.dataloaders
track_running_stats=False
# Run evaluation using testing data
for i in range(adc_num-1, -1, -1):
    for j in range(adc_bitwidth-1, -1, -1):
        cnn = cnns[i*8+j]
        try:
            checkpoint_path = f"resnet18_checkpoint_adc_{i}_bit_{j}.pth"
            checkpoint = torch.load(checkpoint_path, map_location=device)
            print(checkpoint.keys())
            cnn.load_state_dict(checkpoint['cnn_state_dict'])
            optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
            start_epoch = checkpoint['epoch']
            print(f"Checkpoint loaded. Resuming from epoch {start_epoch}")
            reached_target_acc = checkpoint['reached_acc']
            print(f"Previous reached accuracy: {reached_target_acc}")
            '''
            if reached_target_acc != 1:
                print(f"cnn_{i}\" did not reach accuracy of 1, skipping evaluation.\n\n")
                continue
            '''
        except FileNotFoundError:
            print(f"No checkpoint found for \"cnn_{i}\". Skipping evaluation.")
        
        correct = 0
        total = 0
        with torch.no_grad():
            # Use TEST dataloader
            for inputs, labels in test_dataloaders[i*8+j]:
                inputs = inputs.unsqueeze(1).unsqueeze(-1)
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = cnn(inputs)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        print(f"Accuracy: {100 * correct / total:.2f}%\n")

#### Classification report

Creates a classification report on all 8 bits.



In [None]:
# Necessary imports
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, RocCurveDisplay
from sklearn.preprocessing import LabelBinarizer
import matplotlib.pyplot as plt
from itertools import cycle

In [None]:
all_preds = []
all_labels = []
all_logits = []  # To store raw logits

for i in range(adc_num-1, -1, -1):
    for j in range(adc_bitwidth-1, -1, -1):
        cnn = cnns[i*8+j]
        print(f"{i*8+j}")
        cnn.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        # Resulting output is either 0 or 1
        cnn.fc = nn.Linear(cnn.fc.in_features, 2)
        # Loss function: not specified in paper, using Cross Entropy Loss
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(cnn.parameters(), lr=5e-3)

        if torch.cuda.is_available():
            device = torch.device("cuda")
            cnn = cnn.to(device)
        else:
            device = torch.device("cpu")

        checkpoint_path = f"resnet18_checkpoint_adc_{i}_bit_{j}.pth"
        try:
            checkpoint = torch.load(checkpoint_path, map_location=device)
            cnn.load_state_dict(checkpoint['cnn_state_dict'])
            optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
            start_epoch = checkpoint['epoch']
            # print(f"Checkpoint loaded. Resuming from epoch {start_epoch}")
            reached_target_acc = checkpoint['reached_acc']
            # print(f"Previous reached accuracy: {reached_target_acc}")
            '''
            if reached_target_acc != 1:
                print(f"cnn_{i}\" did not reach accuracy of 1, skipping evaluation.\n\n")
                continue
            '''
        except FileNotFoundError:
            print(f"No checkpoint found for \"cnn_{i}\". Skipping evaluation.")
            continue

        # Evaluate the model
        with torch.no_grad():
            # Use TEST dataloader
            for inputs, labels in test_dataloaders[i*8+j]:
                inputs = inputs.unsqueeze(1).unsqueeze(-1)
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = cnn(inputs)  # Raw logits
                _, preds = torch.max(outputs, 1)

                adc_logits = outputs.cpu().numpy()  # Store raw logits
                adc_preds = preds.cpu().numpy()
                adc_labels = labels.cpu().numpy()

        all_logits.append(adc_logits)
        all_preds.append(adc_preds)
        all_labels.append(adc_labels)
    
        # Compute accuracy for adc_i
        accuracy = accuracy_score(adc_labels, adc_preds)
        print(f"Test Accuracy for adc_{i}: {accuracy:.4f}")

        # Generate classification report for adc_i
        print(f"\nClassification Report for adc_{i}:")
        print(classification_report(adc_labels, adc_preds))

        # Generate confusion matrix for adc_i
        print(f"\nConfusion Matrix for adc_{i}:")
        print(confusion_matrix(adc_labels, adc_preds))

all_logits = np.concatenate(all_logits)
all_preds = np.concatenate(all_preds)
all_labels = np.concatenate(all_labels)

print("\nFINAL COMBINED RESULTS FOR ALL ADCS")

# Compute accuracy for all adcs
accuracy = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {accuracy:.4f}")

# Generate classification report for all adcs
print("\nClassification Report:")
print(classification_report(all_labels, all_preds))

# Generate confusion matrix for all adcs
print("\nConfusion Matrix:")
print(confusion_matrix(all_labels, all_preds))