This notebook is used to train the triplet network (or feature extractor). During the fine-tuning phase, this feature extractor is used to extract the features of the power trace amd train appropriate machine learning classifier (in this study k-NN classifier).

To train the triplet model, make the changes listed under "Triplet Model Parameters" in this notebook. More specifically, changes should be made to the dictionary "data params." The following is a description of each dictionary parameter:

- ~~input_path: The directory containing the dataset for training the triplet model is specified by input path.~~
- input path: The file containing the dataset for training the triplet model is specified by input path.
- target_byte: The attack byte is designated as target byte.
- start_idx: starting index of the attack window.
- end_idx: last index of the attack window.
- testType: The testType parameter is used to select the appropriate dataset with the same or different key as the training dataset. It is always "samekey" when training the triplet model.
- n: Number of samples per class utilized for training the triplet network.
- triplet_model_path: Path to save the trained triplet model.

In [1]:
import os, sys

sys.path.append('../utilities/')

from modelTrainingUtility import *
from modelZoo import triplet_cnn
import pandas as pd
import random

import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras.layers import Input, Lambda, Dot
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.callbacks import CSVLogger

%load_ext autotime

#### Parameters of the triplet model

In [2]:
# parameters for loading the dataset for training the model
data_params = {
    "input_path": '../../TripletPower-data/stm32f-unmasked/PC2_CB2_TDS3_K0_U_200k/train_same_key.npz',  # path to load the dataset
    "target_byte": 2,  # byte for which ranking is to be performed
    "start_idx": 1200,
    "end_idx": 2200,
    "testType": "samekey",
    "n": 100,  # number of samples to be selected from each class for training feature extractor
    "triplet_model_path": '../models/feature-extractor-model/stm32f-unmasked/' # path to save the trained triplet model (or feature extractor)
}

In [3]:
def build_pos_pairs_for_id(classid):  # classid --> e.g. 0
    traces = classes_to_ids[classid]
    # pos_pairs is actually the combination C(10,2)
    # e.g. if we have 10 example [0,1,2,...,9]
    # and want to create a pair [a, b], where (a, b) are different and order does not matter
    # e.g. [(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9),
    # (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (1, 9)...]
    # C(10, 2) = 45
    pos_pairs = [(traces[i], traces[j]) for i in range(len(traces)) for j in range(i + 1, len(traces))]
    random.shuffle(pos_pairs)
    return pos_pairs

In [4]:
def build_positive_pairs(class_id_range):
    # class_id_range = range(0, num_classes)
    listX1 = []
    listX2 = []
    for class_id in class_id_range:
        pos = build_pos_pairs_for_id(class_id)
        # -- pos [(1, 9), (0, 9), (3, 9), (4, 8), (1, 4),...] --> (anchor example, positive example)
        for pair in pos:
            listX1 += [pair[0]]  # identity
            listX2 += [pair[1]]  # positive example
    perm = np.random.permutation(len(listX1))
    # random.permutation([1,2,3]) --> [2,1,3] just random
    # random.permutation(5) --> [1,0,4,3,2]
    # In this case, we just create the random index
    # Then return pairs of (identity, positive example)
    # that each element in pairs in term of its index is randomly ordered.
    return np.array(listX1)[perm], np.array(listX2)[perm]

In [5]:
# Build a loss which doesn't take into account the y_true, as# Build
# we'll be passing only 0
def identity_loss(y_true, y_pred):
    return K.mean(y_pred - 0 * y_true)


# The triplet loss
def cosine_triplet_loss(X):
    _alpha = alpha_value
    positive_sim, negative_sim = X

    losses = K.maximum(0.0, negative_sim - positive_sim + _alpha)
    return K.mean(losses)

In [6]:
# ------------------- Hard Triplet Mining -----------
# Naive way to compute all similarities between all network traces.

def build_similarities(conv, all_imgs):
    embs = conv.predict(all_imgs)
    embs = embs / np.linalg.norm(embs, axis=-1, keepdims=True)
    all_sims = np.dot(embs, embs.T)
    return all_sims

def intersect(a, b):
    return list(set(a) & set(b))

def build_negatives(anc_idxs, pos_idxs, similarities, neg_imgs_idx, num_retries=50):
    # If no similarities were computed, return a random negative
    if similarities is None:
        return random.sample(neg_imgs_idx,len(anc_idxs))
    final_neg = []
    # for each positive pair
    for (anc_idx, pos_idx) in zip(anc_idxs, pos_idxs):
        anchor_class = id_to_classid[anc_idx]
        #positive similarity
        sim = similarities[anc_idx, pos_idx]
        # find all negatives which are semi(hard)
        possible_ids = np.where((similarities[anc_idx] + alpha_value) > sim)[0]
        possible_ids = intersect(neg_imgs_idx, possible_ids)
        appended = False
        for iteration in range(num_retries):
            if len(possible_ids) == 0:
                break
            idx_neg = random.choice(possible_ids)
            if id_to_classid[idx_neg] != anchor_class:
                final_neg.append(idx_neg)
                appended = True
                break
        if not appended:
            final_neg.append(random.choice(neg_imgs_idx))
    return final_neg

In [7]:
class SemiHardTripletGenerator():
    def __init__(self, Xa_train, Xp_train, batch_size, all_traces, neg_traces_idx, conv):
        self.batch_size = batch_size

        self.traces = all_traces
        self.Xa = Xa_train
        self.Xp = Xp_train
        self.cur_train_index = 0
        self.num_samples = Xa_train.shape[0]
        self.neg_traces_idx = neg_traces_idx
        self.all_anchors = list(set(Xa_train))
        self.mapping_pos = {v: k for k, v in enumerate(self.all_anchors)}
        self.mapping_neg = {k: v for k, v in enumerate(self.neg_traces_idx)}
        if conv:
            self.similarities = build_similarities(conv, self.traces)
        else:
            self.similarities = None

    def next_train(self):
        while 1:
            self.cur_train_index += self.batch_size
            if self.cur_train_index >= self.num_samples:
                self.cur_train_index = 0

            # fill one batch
            traces_a = self.Xa[self.cur_train_index:self.cur_train_index + self.batch_size]
            traces_p = self.Xp[self.cur_train_index:self.cur_train_index + self.batch_size]
            traces_n = build_negatives(traces_a, traces_p, self.similarities, self.neg_traces_idx)

            yield ([self.traces[traces_a],
                    self.traces[traces_p],
                    self.traces[traces_n]],
                   np.zeros(shape=(traces_a.shape[0]))
                   )

In [8]:
# load the power traces on which base model is to be trained (Pre-training phase)
X_profiling, y_profiling, key = load_training_data_2(data_params)
# shape of the dataset
print('shape of X_profiling: ', X_profiling.shape)
print('shape of y_profiling: ', y_profiling.shape)
print('key used in pre-training phase: ', key)

# number of unique classes in the dataset
nb_classes = len(np.unique(y_profiling))
print('number of classes in the dataset: ', nb_classes)

# getting the subset of the dataset
x, y, all_data_df = create_df(X_profiling, y_profiling, data_params['n'])

# reshaping the dataset for training
all_traces = x.reshape((x.shape[0], x.shape[1], 1))
print('reshaped of the dataset for training: ', all_traces.shape)

loading data for training model ...
processing data for key byte 2
training data loaded successfully!
shape of X_profiling:  (200000, 1000)
shape of y_profiling:  (200000,)
key used in pre-training phase:  [ 43 126  21  22  40 174 210 166 171 247  21 136   9 207  79  60]
number of classes in the dataset:  256
shape of the power traces to be used for training:  (25600, 1000)
reshaped of the dataset for training:  (25600, 1000, 1)


In [9]:
# create groups for all the classes
all_data_group = all_data_df.groupby(['label'])
# build mapping between classes and power traces
classes_to_ids = all_data_group.groups
# classes_to_ids --> {0: [0, 1, 2, 3, 4, 5], 1: [1, 2, 3, 4, 5]}
# print(classes_to_ids)
id_to_classid = {v: c for c, traces in classes_to_ids.items() for v in traces}
# id_to_classid --> {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 1,...]
# print(id_to_classid)

# generating anchor and positive pairs
Xa_train, Xp_train = build_positive_pairs(range(0, nb_classes))

# Gather the ids of all power traces that are used for training
# This just union of two sets set(A) | set(B)
all_traces_train_idx = list(set(Xa_train) | set(Xp_train))
print("shape of X_train Anchor: ", Xa_train.shape)
print("shape of X_train Positive: ", Xp_train.shape)

# parameters for training the triplet network
alpha = 0.1 # margin
batch_size_value = 100
emb_size = 512
number_epoch = 10 # default was 10 
opt = RMSprop(lr=0.00001)

description = 'Triplet_Model' + '_emb_size_' + str(emb_size) + '_epochs_' + str(number_epoch) + '_target_byte_' + str(data_params["target_byte"]) + '_nsamples_' + str(data_params["n"])
print("with parameters, Alpha: %s, Batch_size: %s, Embedded_size: %s, Epoch_num: %s, N: %s" %
      (alpha, batch_size_value, emb_size, number_epoch, data_params["n"]))

alpha_value = float(alpha)
print(description)

# path to save triplet model and its statistics
model_dir_path = data_params["triplet_model_path"]
if not os.path.isdir(model_dir_path):
    os.makedirs(model_dir_path)

model_dir = os.path.join(model_dir_path)

K.clear_session()

shared_conv2 = triplet_cnn(input_shape=(all_traces.shape[1], 1), emb_size=emb_size)

anchor = Input((all_traces.shape[1], 1), name='anchor')
positive = Input((all_traces.shape[1], 1), name='positive')
negative = Input((all_traces.shape[1], 1), name='negative')

a = shared_conv2(anchor)
p = shared_conv2(positive)
n = shared_conv2(negative)

# The Dot layer in Keras now supports built-in Cosine similarity using the normalize = True parameter.
# From the Keras Docs:
# keras.layers.Dot(axes, normalize=True)
# normalize: Whether to L2-normalize samples along the dot product axis before taking the dot product.
#  If set to True, then the output of the dot product is the cosine proximity between the two samples.
pos_sim = Dot(axes=-1, normalize=True)([a, p])
neg_sim = Dot(axes=-1, normalize=True)([a, n])

# customized loss
loss = Lambda(cosine_triplet_loss,
              output_shape=(1,))(
    [pos_sim, neg_sim])

model_triplet = Model(
    inputs=[anchor, positive, negative],
    outputs=loss)

print(model_triplet.summary())

# compiling the triplet model
model_triplet.compile(loss=identity_loss, optimizer=opt)
# batch size
batch_size = batch_size_value
# At first epoch we don't generate hard triplets
gen_hard = SemiHardTripletGenerator(Xa_train, Xp_train, batch_size, all_traces, all_traces_train_idx, None)
nb_epochs = number_epoch
csv_logger = CSVLogger(model_dir + 'Training_Log_%s.csv' % description, append=True, separator=';')

shape of X_train Anchor:  (1267200,)
shape of X_train Positive:  (1267200,)
with parameters, Alpha: 0.1, Batch_size: 100, Embedded_size: 512, Epoch_num: 10, N: 100
Triplet_Model_emb_size_512_epochs_10_target_byte_2_nsamples_100
Model: "functional_3"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
anchor (InputLayer)             [(None, 1000, 1)]    0                                            
__________________________________________________________________________________________________
positive (InputLayer)           [(None, 1000, 1)]    0                                            
__________________________________________________________________________________________________
negative (InputLayer)           [(None, 1000, 1)]    0                                            
_________________________________________________________

In [10]:
print('Training a feature extractor ...')
for epoch in range(nb_epochs):
    print("built new hard generator for epoch " + str(epoch))
    model_triplet.fit_generator(generator=gen_hard.next_train(),
                                steps_per_epoch=Xa_train.shape[0] // batch_size,
                                epochs=1, verbose=1, callbacks=[csv_logger])
    gen_hard = SemiHardTripletGenerator(Xa_train, Xp_train, batch_size, all_traces, all_traces_train_idx,
                                        shared_conv2)
    # For no semi-hard_triplet
    # gen_hard = HardTripletGenerator(Xa_train, Xp_train, batch_size, all_traces, all_traces_train_idx, None)
    
shared_conv2.save(model_dir + '%s.hdf5' % description)
print('feature extractor trained and saved successfully at %s' % model_dir)

Training a feature extractor ...
built new hard generator for epoch 0
Instructions for updating:
Please use Model.fit, which supports generators.
built new hard generator for epoch 1
built new hard generator for epoch 2
built new hard generator for epoch 3
built new hard generator for epoch 4
built new hard generator for epoch 5
built new hard generator for epoch 6
built new hard generator for epoch 7
built new hard generator for epoch 8
built new hard generator for epoch 9
feature extractor trained and saved successfully at ../models/feature-extractor-model/stm32f-unmasked/
