# Dynamic CNN attack using Skywater non-linearized data

The goal of this notebook is to correctly preprocess the given data as tensors, train and test CNN models per bit in order to find the best architecture per bit.

In [1]:
# dataloader.py
# Necessary imports
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os
import re

## Creating dataloaders
Used files from Kareem's GitHub repo. I may have made mistakes in sampling the data, so feel free to change anything that has been configured incorrectly.

### Helper functions - Dataloaders

In [2]:
# dataloader.py
# Returns list of files with given format 
def get_files(directory, format, digital_index=0):

    format = re.compile(format)
    files = os.listdir(directory)

    #file_dict = {}
    file_list = [] # fname, fpath, label

    for fname in files:
        if match := format.match(fname):
            fpath = os.path.join(directory, fname)

            dvalue = int(match.groups()[digital_index])
            
            file_list.append((fname, fpath, dvalue))

            #if dvalue in file_dict:
            #    file_dict[dvalue].append(fpath)
            #else:
            #    file_dict[dvalue] = [fpath]

    return file_list #file_dict, file_path

# Creates dataset with given traces
class TraceDataset(Dataset):
    cached_traces = {}
    trace_list    = []

    def __init__(self, file_list, cache=True):
        self.file_list = file_list
        self.cache     = cache

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, index):
        fname, fpath, label = self.file_list[index]
        label = self.process_label(label)

        if self.cache and fname in self.cached_traces:
            return self.cached_traces[fname], label
        else:
            return self.load_trace(fname, fpath), label

    def get_info(self, index):
        return self.file_list[index]

    def load_trace(self, fname, fpath):
        with open(fpath, 'r') as file:
            header = file.readline()
            #time_arr = []
            valu_arr = []
            # Fixed error of float32 incorrectly translating values
            for line in file.readlines():
                time, value = line.strip().split()
                match = re.search(r"(?<=e-)\d+", value)
                if match:
                    strip_val = value[0:8]
                    # instead of manual string manipulation via index, used split for safety
                    if(match.group() != "04"):
                        integer_part, fractional_part = strip_val.split('.')
                        new_integer_part = integer_part + fractional_part[:4 - int(match.group())]
                        new_fractional_part = fractional_part[4 - int(match.group()):] 
                        strip_val = new_integer_part + '.' + new_fractional_part
                #time_arr.append(np.float32(time))
                valu_arr.append(np.float32(strip_val))

        trace = np.array(valu_arr, dtype=np.float32)

        if self.cache: 
            self.cached_traces[fname] = trace
            self.trace_list.append(trace)

        return trace
    
    def process_label(self, label): return label

    def cache_all(self):
        assert self.cache == True

        print("Caching all traces")
        for fname, fpath, label in self.file_list:
            self.load_trace(fname, fpath)
        print("DONE Caching all traces")

class TraceDatasetBW(TraceDataset):
    def __init__(self, file_list, bit_select, cache=True):
        self.bit_mask = 1 << bit_select
        super().__init__(file_list, cache=cache)

    def process_label(self, label):
        return 1 if label & self.bit_mask else 0

class TraceDatasetBuilder:
    def __init__(self, adc_bitwidth=8, cache=True):
        self.file_list        = []
        self.cache = cache
        self.adc_bits = adc_bitwidth

        self.dataset = None
        self.dataloader = None
        self.datasets = []
        self.dataloaders = []

    def add_files(self, directory, format, label_group):
        ''' Builds list of powertrace files
        Inputs:
            directory   : folder to search for files
            format      : regular expression to match filenames
            label_index : group index for digital output label corresponding to trace
        Outputs:
            list        : [(file_name, file_path, label) ... ]
        '''
        format = re.compile(format)
        fnames = os.listdir(directory)

        for fname in fnames:
            if match := format.match(fname):
                fpath = os.path.join(directory, fname)
                dvalue = int(match.groups()[label_group])

                self.file_list.append((fname, fpath, dvalue))

    def build(self):
        self.dataset = TraceDataset(self.file_list, cache=self.cache)
        for b in range(self.adc_bits):
            self.datasets.append(TraceDatasetBW(self.file_list, b, cache=self.cache))

        if self.cache:
            self.dataset.cache_all()

    def build_dataloaders(self, **kwargs): # batch_size=256, shuffle=True
        self.dataloader = DataLoader(self.dataset, **kwargs)
        self.dataloaders = [DataLoader(dataset, **kwargs) for dataset in self.datasets]

## Creating dataloaders
This section:
1) Creates dataloaders for both the training and testing datasets.
2) Validates and checks the contents of each dataset.
3) Common variables, such as "input_size" are instantiated.

In [3]:
# Hyperparameters for dataloaders
adc_bits = 8

In [4]:
# Create dataloaders
pwd = os.getcwd()
# print(pwd)
# proj_dir = os.path.dirname(os.path.dirname(pwd))
# print(proj_dir)
# data_dir = os.path.join(pwd, 'analog', 'outfiles')

builder  = TraceDatasetBuilder(adc_bitwidth=adc_bits, cache=True)
builder.add_files(os.path.join(pwd, 'sky_Dec_18_2151'), "sky_d(\\d+)_.*\\.txt", 0)
builder.build()
builder.build_dataloaders(batch_size=256, shuffle=True)

# Initialize dataloaders
dataloaders = builder.dataloaders

try:
    print(f"\nTotal number of datasets in dataloaders: {len(dataloaders)}")
except ModuleNotFoundError:
    print("\tERROR: Dataloader not found. Recheck your implementation and try again.")
    raise
except Exception as e:
    print("\tERROR: Dataloader invalid. Recheck your implementation and try again.")
    raise

if not dataloaders or len(dataloaders) != adc_bits:
    print(f"\tERROR: Dataloader size is incorrect: {len(dataloaders) if dataloaders else 0}. It should be: {adc_bits}.")
    print("\tRecheck your implementation and try again.")
    raise ValueError("Invalid dataloader size")

# Extract input_size and validate initial dataloader
input_size, label_num = None, None
for input, labels in dataloaders[0]:
    input_size = input.size(1)
    label_num = len(labels)
    print(f"Input size: {input_size}")
    print(f"Number of labels: {label_num}\n")

    if input.size(0) != label_num:
        print("\tERROR: Number of traces per input is different from number of labels.")
        print(f"\tNumber of traces per input: {input.size(0)}")
        print(f"\tNumber of labels: {label_num}\n")
        raise ValueError("Mismatch between input size and number of labels")

# Validate all dataloaders
for idx, dataloader in enumerate(dataloaders):
    for input, labels in dataloader:
        if input.size(0) != label_num:
            print(f"\tERROR: Number of traces per input is incorrect in dataloader {idx}.")
            print(f"\tExpected: {label_num}, but got: {input.size(0)}.")
            raise ValueError("Invalid number of traces per input")

        if input.size(1) != input_size:
            print(f"\tERROR: Size of traces per input is incorrect in dataloader {idx}.")
            print(f"\tExpected: {input_size}, but got: {input.size(1)}.")
            raise ValueError("Invalid trace size")

        if len(labels) != label_num:
            print(f"\tERROR: Number of labels is incorrect in dataloader {idx}.")
            print(f"\tExpected: {label_num}, but got: {len(labels)}.")
            raise ValueError("Invalid label count")

    print(f"Validation for dataloader {idx} complete.")

print("\nAll dataloaders are valid. Proceeding to training...")

Caching all traces
DONE Caching all traces

Total number of datasets in dataloaders: 8
Input size: 3001
Number of labels: 256

Validation for dataloader 0 complete.
Validation for dataloader 1 complete.
Validation for dataloader 2 complete.
Validation for dataloader 3 complete.
Validation for dataloader 4 complete.
Validation for dataloader 5 complete.
Validation for dataloader 6 complete.
Validation for dataloader 7 complete.

All dataloaders are valid. Proceeding to training...


### Setup for CNN
Section where the initial imports and variables for our native CNN.

In [5]:
# CNN code
# Necessary imports
import os
import sys
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset, random_split
import numpy as np
import datetime


### Optimizing CUDA version
Section where we desginate the most updated CUDA version for current GPU.

Implementation in progress, but is a lot of unnecessary work; will do if CPU training speeds are unformidable.

## Training CNNs - Native CNN
Using a native CNN, we train each CNN until all of them reaches an accuracy of 1.0.

I'm not sure if aiming for an accuracy of 1.0 is beneficial, as it is just overfitting the model to the training data. A more realistic value may be 0.99, but will set it to 1.0 for the current simulated environment.

The training function automatically reduces the learning rate used in Adam based on target accuracy. Currently testing different values and decrease rates. 

### Hyperparameters

1) def_lr: Default learning rate. Change this value to test different learning rate values.
2) max_epoch: Limit of maximum training epochs; training ends if it reaches this amount of epochs.

In [6]:
# Main hyperparameters
def_lr = 1e-4
max_epoch = 100000

### Looking for optimal CNN parameters

Paramerize components of CNN and find best overall hyperparameters.

In [7]:
# Imports
import json
import os
import sys
import time
import hashlib
import datetime
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import matplotlib.pyplot as plt
import re

from copy import copy

import argparse

#### CNN Hyperparameter format

#### 1. Convolutional Layer (Conv1d)
##### Format:
```
conv_[# of conv layer]_[# of out_channels]_[kernel_size #]_[stride #]_[padding #]_[dilation #]
```
##### Example:
```python
self.conv1 = nn.Conv1d(in_channels=1, out_channels=8, kernel_size=5, stride=1, padding=2, dilation=1)
# Naming: conv_1_1_8_5_1_2_1
```
> **Note:** Padding mode = `'zeros'`

#### 2. Max Pooling Layer (MaxPool1d)
##### Format:
```
maxpool_[# of pool layer]_[kernel_size #]_[stride #]_[padding #]_[dilation #]
```
##### Example:
```python
self.pool1 = nn.MaxPool1d(kernel_size=3, stride=3, padding=0, dilation=1)
# Naming: maxpool_1_3_3_0_1
```

---

#### 3. Fully Connected Layer (Linear)
##### Format:
```
linear_[# of fc layer]_[in_features #]_[out_features #]
```
##### Example:
```python
self.fc1 = nn.Linear(500, 500)
# Naming: linear_1_500_500
```

In [8]:
# Hyperparameter update testing area
# Change values within this section to attempt training on CNN
# IN FUTURE: Update functions to use different datasets, will test ONLY on non-linerized data

# default for both conv and maxpool: padding=0, dilation=1
# conv_[# of conv layer]_[# of out_channels]_[kernel_size #]_[stride #]_[padding #]_[dilation #]
# maxpool_[# of pool layer]_[kernel_size #]_[stride #]_[padding #]_[dilation #]
# linear_[# of fc layer]_[in_features #]_[out_features #]
cnn_format = {
    "conv_params": ["conv_1_8_5_1_2_1",
                    "conv_2_8_5_1_2_1",
                    "conv_3_8_5_1_2_1",
                    "conv_4_8_5_1_2_1",
                    "conv_5_8_5_1_2_1",
                    "conv_6_8_5_1_2_1",
                    "conv_1_16_5_1_2_1",
                    "conv_2_16_5_1_2_1",
                    "conv_3_16_5_1_2_1",
                    "conv_4_16_5_1_2_1",
                    "conv_5_16_5_1_2_1",
                    "conv_6_16_5_1_2_1",
                    ],
    "maxpool_params" : ["maxpool_3_3_0_1",
                        "maxpool_5_5_0_1",
                    ],
    "linear_params" : ["linear_1_500_500",
                       "linear_2_500_500",
                       "linear_3_500_500",
                       "linear_4_500_500",
                    ],
    }


In [9]:
'''
# Parsing area, constructs CNNs using hyperparameters
cnn_pattern = r"conv_(\d+)_(\d+)_(\d+)_(\d+)_(\d+)"
pool_pattern = r"maxpool_(\d+)_(\d+)"
linear_pattern = r"linear_(\d+)_(\d+)_(\d+)"

def extract_conv_params(conv_string):
    match = re.match(cnn_pattern, conv_string)
    if match:
        return {
            "conv_layer_num": int(match.group(1)),
            "out_channels": int(match.group(2)),
            "kernel_size": int(match.group(3)),
            "stride": int(match.group(4)),
            "padding": int(match.group(5)),
        }
    return None

def extract_pool_params(pool_string):
    match = re.match(pool_pattern, pool_string)
    if match:
        return {
            "kernel_size": int(match.group(2)),
            "stride": int(match.group(3))
        }
    return None

def extract_linear_params(linear_string):
    match = re.match(linear_pattern, linear_string)
    if match:
        return {
            "linear_layer_num": int(match.group(1)),
            "in_features": int(match.group(2)),
            "out_features": int(match.group(3))
        }
    return None
'''

'\n# Parsing area, constructs CNNs using hyperparameters\ncnn_pattern = r"conv_(\\d+)_(\\d+)_(\\d+)_(\\d+)_(\\d+)"\npool_pattern = r"maxpool_(\\d+)_(\\d+)"\nlinear_pattern = r"linear_(\\d+)_(\\d+)_(\\d+)"\n\ndef extract_conv_params(conv_string):\n    match = re.match(cnn_pattern, conv_string)\n    if match:\n        return {\n            "conv_layer_num": int(match.group(1)),\n            "out_channels": int(match.group(2)),\n            "kernel_size": int(match.group(3)),\n            "stride": int(match.group(4)),\n            "padding": int(match.group(5)),\n        }\n    return None\n\ndef extract_pool_params(pool_string):\n    match = re.match(pool_pattern, pool_string)\n    if match:\n        return {\n            "kernel_size": int(match.group(2)),\n            "stride": int(match.group(3))\n        }\n    return None\n\ndef extract_linear_params(linear_string):\n    match = re.match(linear_pattern, linear_string)\n    if match:\n        return {\n            "linear_layer_num"

In [10]:
class DynamicCNN(nn.Module):
    def __init__(self, conv_params, maxpool_params, linear_params, input_length=3001):
        super(DynamicCNN, self).__init__()
        self.layers = nn.ModuleList()
        in_channels = 1  # Initial input channel
        final_size = input_length

        # Parse convolutional layer parameters
        try:
            conv_layer_num, out_channels, conv_kernel_size, conv_stride, conv_padding, conv_dilation = map(int, conv_params.split("_")[1:])
        except ValueError:
            raise ValueError("Error: conv_params format is incorrect.")

        # Parse max-pooling layer parameters
        try:
            pool_kernel_size, pool_stride, pool_padding, pool_dilation = map(int, maxpool_params.split("_")[1:])
        except ValueError:
            raise ValueError("Error: maxpool_params format is incorrect.")

        # Convolutional layers
        for _ in range(conv_layer_num):
            self.layers.append(nn.Conv1d(in_channels, out_channels, conv_kernel_size, conv_stride, conv_padding, conv_dilation))
            final_size = (final_size + 2 * conv_padding - conv_dilation * (conv_kernel_size - 1) - 1) // conv_stride + 1
            self.layers.append(nn.ReLU())
            self.layers.append(nn.MaxPool1d(pool_kernel_size, pool_stride, pool_padding, pool_dilation))
            final_size = (final_size + 2 * pool_padding - pool_dilation * (pool_kernel_size - 1) - 1) // pool_stride + 1
            in_channels = out_channels  # Update input channels for the next layer

        # Fully connected layers
        self.fc_layers = nn.ModuleList()
        try:
            fc_layer_num, in_features, out_features = map(int, linear_params.split("_")[1:])
        except ValueError:
            print("Error: linear_params format is incorrect.")
            raise
        # First FC has in features: flat_size * out_channels
        self.fc_layers.append(nn.Linear(out_channels * final_size, out_features))
        self.fc_layers.append(nn.ReLU())
        for i in range(1,int(fc_layer_num)):
            self.fc_layers.append(nn.Linear(in_features, out_features))
            self.fc_layers.append(nn.ReLU())

        # Output layer
        self.output_layer = nn.Linear(out_features, 2)  # Binary classification

    def forward(self, x):
        try:
            x = x.unsqueeze(1)  # Add channel dimension

            # Pass through convolutional layers
            for layer in self.layers:
                x = layer(x)

            # Flatten dynamically
            x = x.view(x.size(0), -1)  # Convert to (batch_size, flattened_features)

            # Fully connected layers
            for layer in self.fc_layers:
                x = layer(x)

            x = torch.softmax(self.output_layer(x), dim=1)
            return x
        except RuntimeError:
            print("Invalid form, skipping now...")
            x = torch.tensor([1]).float()
            x[x == 1] = float('nan')
            return x

### CNN model initialization
Function that initializes the CNN architecture.

### Hyperparameters

In [11]:
max_allowed_epoch = {
        "7": 100,
        "6": 200,
        "5": 400,
        "4": 800,
        "3": 1600,
        "2": 3200,
        "1": 6400,
        "0": 12800
    }

### Helper Functions

In [12]:
''' 
Function that initializes the CNN and its core components.
Inputs:
    1) cnn_num: cnn bit number
Returns:
    1) cnn: model used(ResNet18)
    2) criterion: loss function, current default=CrossEntropyLoss
    3) learning_rate: learning rate, current default=def_lr
    4) optimizer: optimizer, current default=Adam
'''
def dynamic_cnn_init(cnn_param_dict, learning_rate):
    tot_cnns = 0
    test_cnns = []
    test_optims = []
    cnn_strings = []
    for conv_param in cnn_param_dict["conv_params"]:
        for maxpool_param in cnn_param_dict["maxpool_params"]:
            for linear_param in cnn_param_dict["linear_params"]:
                new_cnn = DynamicCNN(conv_param, maxpool_param, linear_param)
                # Optimizer: not specified in paper, using Adam
                new_optim = optim.Adam(new_cnn.parameters(), lr=learning_rate)
                test_cnns.append(new_cnn)
                test_optims.append(new_optim)
                tot_cnns += 1
                cnn_strings.append(f"{conv_param}_{maxpool_param}_{linear_param}")
    # Loss function: not specified in paper, using Cross Entropy Loss
    criterion = nn.CrossEntropyLoss()
    # Learning rate: default set to def_lr, adjust accordingly
    learning_rate = def_lr
    return test_cnns, test_optims, criterion, learning_rate, tot_cnns, cnn_strings

''' 
Function that initializes parameters used in training.
Inputs:
    None
Returns:
    TRAINING PARAMETERS
    1) num_epochs: number of maximum epochs per training
    2) max_grad_norm: gradient clipping threshold
'''
def param_init():
    # training parameters
    max_grad_norm = 1.0

    return max_grad_norm

''' 
Function that loads variables from saved .pth file. Checks if the file is valid.
This function simply receives the checkpoint file and checks if it is valid.
Inputs:
    1) checkpoint_path: string of .pth file name 
    2) device: selected device to run training
Returns:
    1) pth_variables: dictionary that contains received .pth variables.
    List of variables are saved in 'load_values' array.
'''
def load_pth_file(checkpoint_path, device, load_values):
    pth_variables = {}
    # Load .pth file
    try:
        checkpoint = torch.load(checkpoint_path, map_location=device, weights_only=True)
        for values in load_values:
            pth_variables[values] = checkpoint.get(values, 0)
        return pth_variables
    except FileNotFoundError:
        print(f"No checkpoint found at {checkpoint_path}. Starting from scratch.")
        raise
    except KeyError as e:
        missing_keys = {
            'cnn_state_dict': "CNN state dictionary",
            'optimizer_state_dict': "optimizer state dictionary",
            'epoch': "epoch",
            'reached_acc': "recorded accuracy of current epoch"
        }
        key = e.args[0]
        if key in missing_keys:
            if key in ['cnn_state_dict', 'optimizer_state_dict', 'reached_acc']:
                print(f"Critical error: Missing \"{key}\" that stores \"{missing_keys[key]}\". Checkpoint file is invalid.")
                raise
            else:
                print(f"Warning: Missing \"{key}\" that stores \"{missing_keys[key]}\". Default values will be used.")
                if key == 'epoch':
                    epoch = 0

        print(f"Unknown parameter \"{key}\" in checkpoint. Recheck file and try again.")
        raise

def train_cnns(cnn_num, test_num, test_cnn, test_optim, cnn_string, test_criterion, max_epoch):
    dataloaders = builder.dataloaders 
    print(f"Starting training for \"cnn_{cnn_num}, {cnn_string}\"...")

    # initialize CNN
    cnn, criterion, optimizer = test_cnn, test_criterion, test_optim
    # initialize parameters
    max_grad_norm = param_init()

    # Create checkpoint to save progress
    checkpoint_path = f"cp_{cnn_num}_{cnn_string}.pth"    
    
    # Set device to cuda if available
    if torch.cuda.is_available():
        device = torch.device("cuda")
        cnn = cnn.to(device)
    else:
        device = torch.device("cpu")
        cnn = cnn.to(device)

    max_acc = 0
    max_acc_epoch = 0

    # Attempt to load saved .pth file
    # start_epoch = LOCAL variable that specifies starting epoch number
    # prev_acc = LOCAL variable that specifies accuracy achieved by loaded .pth file
    load_values = ['cnn_state_dict', 'optimizer_state_dict', 'epoch', 'reached_acc']
    try:
        pth_vars = load_pth_file(checkpoint_path, device, load_values)
    except FileNotFoundError:
        start_epoch = 0
        prev_acc = 0
    else:
        # Manually create variables per value
        cnn.load_state_dict(pth_vars['cnn_state_dict'])
        optimizer.load_state_dict(pth_vars['optimizer_state_dict'])
        start_epoch = pth_vars['epoch']
        prev_acc = pth_vars['reached_acc']
        max_acc = prev_acc
    
        print(f"Checkpoint loaded. Resuming from epoch {start_epoch}.")
        print(f"\tPrevious reached accuracy: {prev_acc}.")
        # Skip training if target accuracy is reached
        if prev_acc == 1:
            print(f"\tSkipping training: accuracy of 1 already achieved.\n")
            return start_epoch, prev_acc
        # Skip training if maximum epoch is met
        if start_epoch >= max_epoch:
            print(f"\tSkipping training: maximum allowed epoch of {max_epoch} already exhausted.\n")
            return start_epoch, prev_acc

    # Start training
    for e in range(start_epoch, max_epoch):
        try:
            correct = 0
            cnn.train()

            for inputs, labels in dataloaders[cnn_num]:
                inputs, labels = inputs.to(device), labels.to(device)
                # Add dimensions for channels and width
                # inputs = inputs.unsqueeze(1).unsqueeze(-1)
                optimizer.zero_grad()
                output = cnn(inputs)
                # Check for NaN in outputs
                if torch.isnan(output).any():
                    print("NaN detected in cnn outputs.\n")
                    return 0, 0

                loss = criterion(output, labels)
                # Check for NaN in loss
                if torch.isnan(loss):
                    print("NaN detected in loss. Stopping training.\n")
                    return 0, 0
                # print(f"Loss: {loss.item()}")
                loss.backward()

                # Gradient clipping
                torch.nn.utils.clip_grad_norm_(cnn.parameters(), max_grad_norm)
                optimizer.step()
                _, predicted = torch.max(output, 1)
                correct += (predicted == labels).sum()
            
            accuracy = correct / 256
            if accuracy > max_acc:
                max_acc = accuracy
                max_acc_epoch = e + 1

            # Change rate of update for printing accuracy accordingly
            if (e + 1) % 50 == 0:
                print(f'\tTRAINING: cnn[{cnn_num}], {cnn_string}], Epoch {e+1}, Loss: {loss.item()}')
                print(f'\tTRAINING: cnn[{cnn_num}], {cnn_string}], Epoch {e+1}, Accuracy: {accuracy}')
                print(f'\tMax accuracy: {max_acc}')
                print(f'\tMax accuracy epoch: {max_acc_epoch}')

                # Save checkpoint
                torch.save({
                    'cnn_state_dict': cnn.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'epoch': e + 1,
                    'reached_acc': accuracy
                }, checkpoint_path)
                print(f"Checkpoint saved for epoch {e + 1}")

            prev_acc = accuracy

            if accuracy == 1:
                print(f"Reached accuracy of 1. Stopping training for \"cnn_{cnn_num}_{test_num}\".")
                print(f"Achieved epoch: {max_acc_epoch}\n")
                # Save checkpoint
                torch.save({
                    'cnn_state_dict': cnn.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'epoch': e + 1,
                    'reached_acc': accuracy
                }, checkpoint_path)
                return e + 1, 1
            if e + 1 == max_epoch:
                print(f"\tMaximum allowed epoch of {max_epoch} reached, aborting training.\n")
                # Save checkpoint
                torch.save({
                    'cnn_state_dict': cnn.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'epoch': e + 1,
                    'reached_acc': accuracy
                }, checkpoint_path)
                return e + 1, accuracy
        except KeyboardInterrupt:
            # Save checkpoint
            torch.save({
                'cnn_state_dict': cnn.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'epoch': e + 1,
                'reached_acc': accuracy
            }, checkpoint_path)
            print(f"Training interrupted. Pausing training...")
            print(f"Checkpoint saved for last epoch {e + 1}")
            print(f'\tFinal accuracy: {accuracy}')
            return e + 1, accuracy


In [13]:
# Basic CUDA checkup
# Set device to cuda if available
print(f"Installed CUDA version: {torch.version.cuda}")
if torch.cuda.is_available():
    print("\tGPU found, running training on GPU...")
else:
    print("\tNo GPU found, running training on CPU...")
    print("\tRecheck CUDA version and if your GPU supports it.")

Installed CUDA version: None
	No GPU found, running training on CPU...
	Recheck CUDA version and if your GPU supports it.


In [None]:
root_directory = os.getcwd()

combined_result = {}
min_epoch = float('inf')
test_cnns, test_optims, criterion, learning_rate, tot_cnns, cnn_strings = dynamic_cnn_init(cnn_format, def_lr)

for cnn_num in range(7, -1, -1):
    best_string = []
    bit_cnn_result = {}
    max_acc = 0
    min_epoch = float('inf')

    # Create directory to save pth files
    cnn_pt_directory = os.path.join(root_directory, f'cnn{cnn_num}')
    os.makedirs(cnn_pt_directory, exist_ok=True)
    os.chdir(cnn_pt_directory)

    while max_acc != 1:
        if min_epoch == max_allowed_epoch[str(cnn_num)]:
            print(f"\tCurrent max epoch: {max_allowed_epoch[str(cnn_num)]} insufficient. Increasing epochs...")
            max_allowed_epoch[str(cnn_num)] *= 2
            min_epoch *= 2
            print(f"\tIncreased max epochs to: {max_allowed_epoch[str(cnn_num)]}...")
        for i in range(tot_cnns):
            if min_epoch < max_allowed_epoch[str(cnn_num)]:
                epoch, accuracy = train_cnns(cnn_num, i, test_cnns[i], test_optims[i], cnn_strings[i], criterion, min_epoch)
            else:
                epoch, accuracy = train_cnns(cnn_num, i, test_cnns[i], test_optims[i], cnn_strings[i], criterion, max_allowed_epoch[str(cnn_num)])
            bit_cnn_result[cnn_strings[i]] = {
                "epoch": epoch,
                "accuracy": accuracy
            }

            if accuracy > max_acc or (accuracy == max_acc and epoch <= min_epoch):
                if epoch == min_epoch:
                    best_string.append(cnn_strings[i])
                else:
                    best_string = [cnn_strings[i]]
                max_acc = accuracy
                min_epoch = epoch

        print(f"Max accuracy: {max_acc}")
        print(f"Minimum epochs: {min_epoch}")
        print(f"Best settings: {best_string}")
            
    
    combined_result[str(cnn_num)] = {
            "max_accuracy": max_acc,
            "min_epochs": min_epoch,
            "best_settings": best_string,
        }



Starting training for "cnn_7, conv_1_8_5_1_2_1_maxpool_3_3_0_1_linear_1_500_500"...
Checkpoint loaded. Resuming from epoch 100.
	Previous reached accuracy: 0.53125.
	Skipping training: maximum allowed epoch of 100 already exhausted.

Starting training for "cnn_7, conv_1_8_5_1_2_1_maxpool_3_3_0_1_linear_2_500_500"...
Checkpoint loaded. Resuming from epoch 100.
	Previous reached accuracy: 0.96875.
	Skipping training: maximum allowed epoch of 100 already exhausted.

Starting training for "cnn_7, conv_1_8_5_1_2_1_maxpool_3_3_0_1_linear_3_500_500"...
Checkpoint loaded. Resuming from epoch 100.
	Previous reached accuracy: 0.98046875.
	Skipping training: maximum allowed epoch of 100 already exhausted.

Starting training for "cnn_7, conv_1_8_5_1_2_1_maxpool_3_3_0_1_linear_4_500_500"...
Checkpoint loaded. Resuming from epoch 30.
	Previous reached accuracy: 1.0.
	Skipping training: accuracy of 1 already achieved.

Starting training for "cnn_7, conv_1_8_5_1_2_1_maxpool_5_5_0_1_linear_1_500_500"..

### Testing the CNNs

Using the resulting CNNs saved in the .pth files, we test each CNNs using the provided power traces.

Currently the testing data is a subset of the training data. In the future, if we can generate more data, we will be able to use separate datasets.



In [None]:
'''
# Run evaluation using testing data
# CURRENTLY USING TRAINING DATA TO TEST DATA: use separate dataset in the future
for i in range(7,-1,-1):
    try:
        checkpoint = torch.load(checkpoint_path, map_location=device, weights_only=True)
        cnn.load_state_dict(checkpoint['cnn_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_epoch = checkpoint['epoch']
        print(f"Checkpoint loaded. Resuming from epoch {start_epoch}")
        reached_target_acc = checkpoint['reached_acc']
        print(f"Previous reached accuracy: {reached_target_acc}")
        if reached_target_acc != 1:
            print(f"cnn_{i}\" did not reach accuracy of 1, skipping evaluation.\n\n")
            continue
    except FileNotFoundError:
        print(f"No checkpoint found for \"cnn_{i}\". Starting from scratch.")

    cnns[i].eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in dataloaders[i]:
            inputs, labels = inputs.to(device), labels.to(device)
            # Add dimensions for channels and width
            inputs = inputs.unsqueeze(1).unsqueeze(-1)
            outputs = cnns[i](inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(f"Accuracy: {100 * correct / total:.2f}%")
'''