# MPL Training for core affinity

This notebook goes over the steps to train the MLP network when the POC is not pinned to one specific core. To accommodate this, we train two different models. The first predicts whether the POC is running on the core for which we optimized our probe position, whereas the second model will predict the actual encoded bit. Note that in our case, we optimized the probe position for both cores 0 and 1.

Besides training two models, the remaining parameters stay the same.

In [1]:
import h5py
import numpy as np
import os
import random

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1'   # Disable tensorflow logs
import tensorflow as tf

from tqdm.notebook import tqdm
from scipy.fft import fft

# Classes and methods definitions

### Keras generator

The traces may not all fit into memory, which is why we need a generator.

Adapted from https://github.com/angulartist/Keras-HDF5-ImageDataGenerator  
Copyright (c) 2023, Jesse De Meulemeester  
Copyright (c) 2017, HDF5 ImageDataGenerator All rights reserved.

In [2]:
class HDF5Generator(tf.keras.utils.Sequence):
    """Just a simple custom Keras HDF5 Generator.
    
    Custom Keras ImageDataGenerator that generates
    batches of traces from HDF5 files
     
    Arguments
    ---------
    src : str
        Path of the hdf5 source file.
    x_key : str
        Key of the h5 file image tensors dataset.
    y_key : str
        Key of the h5 file labels dataset.
    classes_key : str
        Key of the h5 file dataset containing
        the raw classes.
    batch_size : int
        Size of each batch, must be a power of two.
        (16, 32, 64, 128, 256, ...)
        Default is 32.
    shuffle : bool
        Shuffle images at the end of each epoch.
        Default is True.
    indices : np.ndarray
        Indices of the traces to use. None will use all available traces.
        Default is None.
    """
    def __init__(
        self,
        f: h5py.File,
        x_key: str,
        y_key: str,
        classes_key: str,
        batch_size: int = 32,
        shuffle: bool = True,
        indices: np.ndarray = None
    ):
        self.f: h5py.File = f
        self.x_key: str = x_key
        self.y_key: str = y_key
        self.classes_key: str = classes_key
        self.batch_size: int = batch_size
        self.shuffle: bool = shuffle

        if indices is None:
            self._indices = np.arange(self.__get_dataset_shape(self.x_key, 0))
        else:
            self._indices = indices

    def __repr__(self):
        """Representation of the class."""
        return f"{self.__class__.__name__}({self.__dict__!r})"

    def __get_dataset_shape(self, dataset: str, index: int):
        """Get an h5py dataset shape.
        
        Arguments
        ---------
        dataset : str
            The dataset key.
        index : int
            The dataset index.
         
        Returns
        -------
        tuple of ints
            A tuple of array dimensions.
        """
        return self.f[self.classes_key][dataset].shape[index]

    def __get_dataset_items(
        self,
        indices: np.ndarray,
    ):
        """Get an HDF5 dataset items.
        
        Arguments
        ---------
        indices : ndarray, 
            The list of current batch indices.
         
        Returns
        -------
        a tuple of ndarrays
            A batch of samples.
        """
        return (self.f[self.classes_key][self.x_key][indices], self.f[self.classes_key][self.y_key][indices])
    
    @property
    def num_items(self):
        """Grab the total number of examples
         from the dataset.
         
        Returns
        -------
        int
            The total number of examples.
        """
        self.f[self.classes_key][self.x_key].shape[0]
    
    @property 
    def classes(self):
        """Grab "human" classes from the dataset.
        
        Returns
        -------
        list
            A list of the raw classes.
        """      
        return self.f[self.classes_key][:]

    def __len__(self):
        """Denotes the number of batches per epoch.
         
        Returns
        -------
        int
            The number of batches per epochs.
        """
        return len(self._indices) // self.batch_size

    def __next_batch(self,
                     indices: np.ndarray):
        """Generates a batch of train/val data for the given indices.
        
        Arguments
        ---------
        index : int
            The index for the batch.
            
        Returns
        -------
        tuple of ndarrays
            A tuple containing a batch of image tensors
            and their associated labels.
        """
        # Grab samples (tensors, labels) HDF5 source file.
        return self.__get_dataset_items(indices)

    def __getitem__(
            self,
            index: int):
        """Generates a batch of data for the given index.
        
        Arguments
        ---------
        index : int
            The index for the current batch.
            
        Returns
        -------
        tuple of ndarrays or ndarray
            A tuple containing a batch of image tensors
            and their associated labels (train) or
            a tuple of image tensors (predict).
        """
        # Indices for the current batch.
        indices = np.sort(self._indices[index * self.batch_size:(index + 1) *
                                        self.batch_size])

        return self.__next_batch(indices)

    def __shuffle_indices(self):
        """If the shuffle parameter is set to True,
         dataset will be shuffled (in-place).
        """
        if self.shuffle:
            np.random.shuffle(self._indices)

    def on_epoch_end(self):
        """Triggered once at the very beginning as well as 
         at the end of each epoch.
        """
        self.__shuffle_indices()


## Creating the training and evaluation datasets

We now transform the input traces into the actual parts we will use. For each trace, we keep the 1000 lowerest frequency components when taking the FFT over the optimal window to predict the bit, and 2000 lowest frequency components for the FFT over the whole trace to detect the core. We store these components for each trace in a new file which we will use during training.

In [None]:
# The directory containing all training data.
# Note that in this case, we collected data on the four different cores.
# The data from different cores will now be used to train the MLP models.
directory_data = "../traces/4-mlp-data/2-reducing-assumptions/1-core-affinity/"

In [None]:
# The random string used in the POC SpectrEM implementations
data = b"data|\x01\x36\x9b\x78\xc9\x2c\x3d\x32\xfa\x83\x50\xaf\x39\xaf\x69\x2d\x58\xd7\x38\x6a\xc1\x63\x15\xc7\x3c\x4d\x96\x61\xe1\x88\xbd\xed"
data = np.frombuffer(data, dtype=np.uint8)
def get_bit_sp(index):
    """Get the bit corresponding to the given inputs
    
    @param index The index for which to get the corresponding bit"""
    return (data[index // 8] & (1 << (index % 8))).astype(bool)

def get_bit_md(inputs):
    """Get the bit corresponding to the given inputs for MeltEMdown
    
    @param inputs The inputs for which to retrieve the secret bit"""
    return inputs[:,2]

### Creating the datasets to detect the core

In [None]:
for data in ("training-data", "validation-data"):
    with h5py.File(f"{directory_data}/mlp-detectcore-{data}.hdf5", 'x', libver='latest') as f:
        expg = f.create_group("data")
        freqsDset = expg.create_dataset("traces", (len(files)*4096*4, 2000), dtype='float32')
        expectedDset = expg.create_dataset("expected", (len(files)*4096*4, 2), dtype='uint8')
        traces_i = 0

        for core in range(4):
            dir = f"{directory_data}/core-{core}/{data}/"
            files = [f for f in os.listdir(dir) if f.endswith(".hdf5")][:4]  # 4 batches for each core

            for file in tqdm(files):
                with h5py.File(f"{dir}/file", 'r', libver='latest') as f_traces:
                    traces = f_traces['data']['traces']
                    inputs = f_traces['data']['inputs']

                    for i in range(traces.shape[0]):
                        # Note: instead of considering a specific window, this MLP network will take
                        # as inputs the FFT over the entire trace (the first 2000 components).
                        freqsDset[traces_i,:] = np.abs(fft(traces[i,:]))[:2000]

                        if core <= 1:
                            # We are interested in the traces where the POC is running on either core
                            # 0 or core 1, as we optimized the probe positions for these cores.
                            expectedDset[traces_i, 1] = 1
                        else:
                            expectedDset[traces_i, 0] = 1

                        traces_i += 1

### Creating the datasets to predict the bit

In [None]:
for data in ("training-data", "validation-data"):
    with h5py.File(f"{directory_data}/mlp-predictbit-{data}.hdf5", 'x', libver='latest') as f:
        expg = f.create_group("data")
        freqsDset = expg.create_dataset("traces", (len(files)*4096*4, 1000), dtype='float32')
        expectedDset = expg.create_dataset("expected", (len(files)*4096*4, 2), dtype='uint8')
        traces_i = 0

        for core in range(2):  # To predict the bit, we only need to consider cores 0 and 1
            dir = f"{directory_data}/core-{core}/{data}/"
            files = [f for f in os.listdir(dir) if f.endswith(".hdf5")]

            for file in tqdm(files):
                with h5py.File(f"{dir}/file", 'r', libver='latest') as f_traces:
                    traces = f_traces['data']['traces']
                    inputs = f_traces['data']['inputs']

                    window_start = f_traces.attrs["optimal_window_start"]
                    window = slice(window_start, window_start+5000)

                    for i in range(traces.shape[0]):

                        freqsDset[traces_i,:] = np.abs(fft(traces[i,window]))[:1000]
                        expectedDset[traces_i, get_bit_sp(i)] = 1

                        traces_i += 1

## Creating the neural network -- detect core

We now define the MLP network that we will use for evaluation to detect the core on which the POC is running.

In [3]:
fname_train = f"{directory_data}/mlp-detectcore-training-data.hdf5"
fname_val = f"{directory_data}/mlp-detectcore-validation-data.hdf5"
    
f_train = h5py.File(fname_train, 'r', libver='latest')
f_val = h5py.File(fname_val, 'r', libver='latest')

In [None]:
N_TRAIN = f_train["data"]["traces"].shape[0]
N_VAL = f_val["data"]["traces"].shape[0]
BATCH_SIZE = 32
STEPS_PER_EPOCH = N_TRAIN // BATCH_SIZE
VALIDATION_STEPS = N_VAL // BATCH_SIZE

print(f"Summary:")
print(f"  Total number of traces: {N_TRAIN + N_VAL}")
print(f"    Of which {N_TRAIN} are used to train the network")
print(f"    Of which {N_VAL} are used to validate the network")
print(f"  Batch size: {BATCH_SIZE}")
print(f"    Resulting in {STEPS_PER_EPOCH} steps per epoch for training")
print(f"    Resulting in {VALIDATION_STEPS} steps per epoch for validation")

In [5]:
lr_schedule = tf.keras.optimizers.schedules.InverseTimeDecay(
  0.0001,
  decay_steps=STEPS_PER_EPOCH * 10,
  decay_rate=1,
  staircase=False)

optimizer = tf.keras.optimizers.Adam(lr_schedule)

In [6]:
training_generator = HDF5Generator(f_train, "traces", "expected", "data", indices=None, batch_size=BATCH_SIZE)
validation_generator = HDF5Generator(f_val, "traces", "expected", "data", indices=None, batch_size=BATCH_SIZE)

In [7]:
model = tf.keras.Sequential([
    tf.keras.layers.Input(2000, dtype="float64"),
    tf.keras.layers.Dense(500, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.001)),
    tf.keras.layers.Dense(200, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.001)),
    tf.keras.layers.Dense(100, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.001)),
    tf.keras.layers.Dense(2, activation='softmax')
])

model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['binary_crossentropy', 'accuracy'])

In [None]:
model.summary()

In [9]:
filepath_checkpoint = f"{directory_data}/mlp_checkpoints-detectcore/saved-model-{{epoch:03d}}-{{val_accuracy:.5f}}.hdf5"
checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath_checkpoint, monitor='val_accuracy', verbose=1, save_best_only=False, mode='max')
callbacks_list = [checkpoint, tf.keras.callbacks.EarlyStopping(monitor='val_binary_crossentropy', patience=20, restore_best_weights=True)]

We now start the actual training.
Note that we used early stopping with patience 20. To get the best model, retrieve the model 20 epochs before the last model from the checkpoints.

In [None]:
history = model.fit(
    x = training_generator,
    validation_data=validation_generator,
    steps_per_epoch = STEPS_PER_EPOCH,
    validation_steps = VALIDATION_STEPS,
    epochs=1000,
    callbacks=callbacks_list)

## Creating the neural network -- predict bit

We now define the MLP network that we will use for predict the encoded bit.

In [None]:
fname_train = f"{directory_data}/mlp-predictbit-training-data.hdf5"
fname_val = f"{directory_data}/mlp-predictbit-validation-data.hdf5"
    
f_train = h5py.File(fname_train, 'r', libver='latest')
f_val = h5py.File(fname_val, 'r', libver='latest')

In [None]:
N_TRAIN = f_train["data"]["traces"].shape[0]
N_VAL = f_val["data"]["traces"].shape[0]
BATCH_SIZE = 32
STEPS_PER_EPOCH = N_TRAIN // BATCH_SIZE
VALIDATION_STEPS = N_VAL // BATCH_SIZE

print(f"Summary:")
print(f"  Total number of traces: {N_TRAIN + N_VAL}")
print(f"    Of which {N_TRAIN} are used to train the network")
print(f"    Of which {N_VAL} are used to validate the network")
print(f"  Batch size: {BATCH_SIZE}")
print(f"    Resulting in {STEPS_PER_EPOCH} steps per epoch for training")
print(f"    Resulting in {VALIDATION_STEPS} steps per epoch for validation")

In [None]:
lr_schedule = tf.keras.optimizers.schedules.InverseTimeDecay(
  0.0001,
  decay_steps=STEPS_PER_EPOCH * 10,
  decay_rate=1,
  staircase=False)

optimizer = tf.keras.optimizers.Adam(lr_schedule)

In [None]:
training_generator = HDF5Generator(f_train, "traces", "expected", "data", indices=None, batch_size=BATCH_SIZE)
validation_generator = HDF5Generator(f_val, "traces", "expected", "data", indices=None, batch_size=BATCH_SIZE)

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Input(1000, dtype="float64"),
    tf.keras.layers.Dense(500, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.001)),
    tf.keras.layers.Dense(200, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.001)),
    tf.keras.layers.Dense(100, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.001)),
    tf.keras.layers.Dense(2, activation='softmax')
])

model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['binary_crossentropy', 'accuracy'])

In [None]:
model.summary()

In [None]:
filepath_checkpoint = f"{directory_data}/mlp_checkpoints-predictbit/saved-model-{{epoch:03d}}-{{val_accuracy:.5f}}.hdf5"
checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath_checkpoint, monitor='val_accuracy', verbose=1, save_best_only=False, mode='max')
callbacks_list = [checkpoint, tf.keras.callbacks.EarlyStopping(monitor='val_binary_crossentropy', patience=20, restore_best_weights=True)]

We now start the actual training.
Note that we used early stopping with patience 20. To get the best model, retrieve the model 20 epochs before the last model from the checkpoints.

In [None]:
history = model.fit(
    x = training_generator,
    validation_data=validation_generator,
    steps_per_epoch = STEPS_PER_EPOCH,
    validation_steps = VALIDATION_STEPS,
    epochs=1000,
    callbacks=callbacks_list)