This notebook uses the dataset generated by saving_datasets.py to train models. We used this to train the baseline models from scaaml on 5k and 20k sample length.

In [None]:
%cd /content/drive/MyDrive/SCA-Datasets/scaaml_data_npz/

/content/drive/MyDrive/SCA-Datasets/scaaml_data_npz


In [None]:
import numpy as np


In [None]:
data = np.load("sub_bytes_in0.npz")

In [None]:
x_train = data["x_train"]
y_train = data["y_train"]
x_test = data["x_test"]
y_test = data["y_test"]

In [None]:
x_train.shape

(65536, 5000, 1)

In [None]:
x_train = x_train[:, :5000]
x_test = x_test[:, :5000]

In [None]:
y_train.shape

(65536, 256)

In [None]:
x_test.shape

(4096, 5000, 1)

In [None]:
y_test.shape

(4096, 256)

In [None]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Intro model."""

from typing import Any, Dict, Tuple

from tensorflow import Tensor
from tensorflow.keras import layers
from tensorflow.keras import Model
from tensorflow.keras.optimizers import Adam

#from scaaml.utils import display_config
def display_config(config_name: str, config: Dict[str, str]) -> None:
    """Pretty print a config object in terminal.

    Args:
        config_name (str): name of the config
        config (dict): config to display
    """
    cprint(f"[{config_name}]", "magenta")
    cnt = 1
    for k, v in config.items():
        color: Color = "yellow"
        if cnt % 2:
            color = "cyan"
        cprint(f"{k}:{v}", color)
        cnt += 1
#from scaaml.utils import get_num_gpu
def get_num_gpu() -> int:
    return len(tf.config.list_physical_devices("GPU"))

# pylint: disable=too-many-positional-arguments
def block(x: Tensor,
          filters: int,
          kernel_size: int = 3,
          strides: int = 1,
          conv_shortcut: bool = False,
          activation: str = "relu") -> Tensor:
    """Residual block with pre-activation
    From: https://arxiv.org/pdf/1603.05027.pdf

    Args:
        x: input tensor.
        filters (int): filters of the bottleneck layer.

        kernel_size(int, optional): kernel size of the bottleneck layer.
        defaults to 3.

        strides (int, optional): stride of the first layer.
        defaults to 1.

        conv_shortcut (bool, optional): Use convolution shortcut if True,
        otherwise identity shortcut. Defaults to False.

        use_batchnorm (bool, optional): Use batchnormalization if True.
        Defaults to True.

        activation (str, optional): activation function. Defaults to "relu".

    Returns:
        Output tensor for the residual block.
    """

    x = layers.BatchNormalization()(x)
    x = layers.Activation(activation)(x)

    if conv_shortcut:
        shortcut = layers.Conv1D(4 * filters, 1, strides=strides)(x)
    else:
        if strides > 1:
            shortcut = layers.MaxPooling1D(1, strides=strides)(x)
        else:
            shortcut = x

    x = layers.Conv1D(filters, 1, use_bias=False, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(activation)(x)

    x = layers.Conv1D(filters,
                      kernel_size,
                      strides=strides,
                      use_bias=False,
                      padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(activation)(x)

    x = layers.Conv1D(4 * filters, 1)(x)
    x = layers.Add()([shortcut, x])
    return x


# pylint: disable=too-many-positional-arguments
def stack(x: Tensor,
          filters: int,
          blocks: int,
          kernel_size: int = 3,
          strides: int = 2,
          activation: str = "relu") -> Tensor:
    """A set of stacked residual blocks.
    Args:
        filters (int): filters of the bottleneck layer.

        blocks (int): number of conv blocks to stack.

        kernel_size(int, optional): kernel size of the bottleneck layer.
        defaults to 3.

        strides (int, optional): stride used in the last block.
        defaults to 2.

        conv_shortcut (bool, optional): Use convolution shortcut if True,
        otherwise identity shortcut. Defaults to False.

        activation (str, optional): activation function. Defaults to "relu".

    Returns:
        tensor:Output tensor for the stacked blocks.
  """
    x = block(x,
              filters,
              kernel_size=kernel_size,
              activation=activation,
              conv_shortcut=True)
    for _ in range(2, blocks):
        x = block(x, filters, kernel_size=kernel_size, activation=activation)
    x = block(x, filters, strides=strides, activation=activation)
    return x


from typing import Any, Dict, Tuple
from tensorflow.keras.models import Model


# pylint: disable=C0103
def Resnet1D(input_shape: Tuple[int, ...], attack_point: str,
             mdl_cfg: Dict[str, Any], optim_cfg: Dict[str,
                                                      Any]) -> Model:
    del attack_point  # unused

    pool_size = mdl_cfg["initial_pool_size"]
    filters = mdl_cfg["initial_filters"]
    block_kernel_size = mdl_cfg["block_kernel_size"]
    activation = mdl_cfg["activation"]
    dense_dropout = mdl_cfg["dense_dropout"]
    num_blocks = [
        mdl_cfg["blocks_stack1"], mdl_cfg["blocks_stack2"],
        mdl_cfg["blocks_stack3"], mdl_cfg["blocks_stack4"]
    ]

    inputs = layers.Input(shape=input_shape)
    x = inputs

    # stem
    x = layers.MaxPool1D(pool_size=pool_size)(x)

    # trunk: stack of residual block
    for block_idx in range(4):
        filters *= 2
        x = stack(x,
                  filters,
                  num_blocks[block_idx],
                  kernel_size=block_kernel_size,
                  activation=activation)

    # head model: dense
    x = layers.GlobalAveragePooling1D()(x)
    for _ in range(1):
        x = layers.Dropout(dense_dropout)(x)
        x = layers.Dense(256)(x)
        x = layers.BatchNormalization()(x)
        x = layers.Activation(activation)(x)

    outputs = layers.Dense(256, activation="softmax")(x)

    model: Model[Any, Any] = Model(inputs=inputs, outputs=outputs)
    model.summary()

    if get_num_gpu() > 1:
        lr = optim_cfg["multi_gpu_lr"]
    else:
        lr = optim_cfg["lr"]

    model.compile(loss=["categorical_crossentropy"],
                  metrics=["acc"],
                  optimizer=Adam(lr))
    return model


def get_model(input_shape: Tuple[int, ...], attack_point: str,
              config: Dict[str, Any]) -> Model:
    """Return an instantiated model based of the config provided.

    Args:
        config (dict): scald config.
    """

    mdl_cfg = config["model_parameters"]
    optim_cfg = config["optimizer_parameters"]

    display_config("model", mdl_cfg)
    display_config("optimizer", optim_cfg)
    return Resnet1D(input_shape, attack_point, mdl_cfg, optim_cfg)

In [None]:
def get_model_stub(attack_point: str, attack_byte: int,
                   config: Dict[str, str]) -> str:
    device = config["device"]
    algorithm = config["algorithm"]
    model = config["model"]
    version = config["version"]
    max_trace_len = config["max_trace_len"]
    return (f"{device}-{algorithm}-{model}-v{version}-ap_{attack_point}-"
            f"byte_{attack_byte}-len_{max_trace_len}")

In [None]:
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard


In [None]:
def train_model(config):
    algorithm = config["algorithm"]
    train_glob = f"datasets/{algorithm}/train/*"
    test_glob = f"datasets/{algorithm}/test/*"
    test_shards = 256
    num_traces_per_test_shards = 16
    batch_size = config["batch_size"]

    for attack_byte in config["attack_bytes"]:
        for attack_point in config["attack_points"]:

            # infers shape
            input_shape = x_train.shape[1:]

            # reset graph and load a new model


            # display config
            cprint(f"[{algorithm}]", "magenta")
            cprint(">Attack params", "green")
            cprint(f"|-attack_point:{attack_point}", "cyan")
            cprint(f"|-attack_byte:{attack_byte}", "yellow")
            cprint(f"|-input_shape:{str(input_shape)}", "cyan")

            # multi gpu
            #strategy = tf.distribute.MirroredStrategy()
            #with strategy.scope():
                #model = get_model(input_shape, attack_point, config)
            mdl_cfg = config["model_parameters"]
            optim_cfg = config["optimizer_parameters"]

            model = Resnet1D(input_shape, attack_point, mdl_cfg, optim_cfg)
                # model recording setup
            stub = get_model_stub(attack_point, attack_byte, config)
            cb = [
                ModelCheckpoint(monitor="val_loss",
                                filepath=f"models/{stub}.keras",
                                save_best_only=True),
                                TensorBoard(log_dir="logs/" + stub, update_freq="batch")
            ]

            model.fit(x_train,
                      y_train,
                      validation_data=(x_test, y_test),
                      verbose=1,
                      epochs=config["epochs"],
                      callbacks=cb)

config = """{
    "model": "cnn",
    "device": "stm32f0",
    "algorithm": "tinyaes",
    "version": "10",
    "attack_points": [
        "sub_bytes_in"

    ],
    "attack_bytes": [
        "0"
    ],
    "max_trace_len": 5000,
    "num_shards": 256,
    "num_traces_per_shard": 256,
    "batch_size": 32,
    "epochs": 30,
    "optimizer_parameters": {
        "lr": 0.001,
        "multi_gpu_lr": 0.001
    },
    "model_parameters": {
        "activation": "relu",
        "initial_filters": 8,
        "initial_pool_size": 4,
        "block_kernel_size": 3,
        "blocks_stack1": 3,
        "blocks_stack2": 4,
        "blocks_stack3": 4,
        "blocks_stack4": 3,
        "dense_dropout": 0.1
    }
}"""

if __name__ == "__main__":
    #parser = argparse.ArgumentParser(description="Train models")
   # parser.add_argument("--config", "-c", default="config", help="Train config")
    #args = parser.parse_args()
   # if not args.config:
   #     parser.print_help()
   #     sys.exit()
   # with open(args.config, encoding="utf-8") as config_file:
   #     train_model(json.loads(config_file.read()))
   train_model(json.loads(config))

[tinyaes]
>Attack params
|-attack_point:sub_bytes_in
|-attack_byte:0
|-input_shape:(5000, 1)


Epoch 1/30
[1m2048/2048[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m80s[0m 18ms/step - acc: 0.0297 - loss: 4.8063 - val_acc: 0.0667 - val_loss: 4.3126
Epoch 2/30
[1m2048/2048[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 17ms/step - acc: 0.1658 - loss: 2.7931 - val_acc: 0.1997 - val_loss: 2.6991
Epoch 3/30
[1m2048/2048[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 17ms/step - acc: 0.2809 - loss: 2.1956 - val_acc: 0.3228 - val_loss: 2.0294
Epoch 4/30
[1m2048/2048[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 17ms/step - acc: 0.3231 - loss: 2.0032 - val_acc: 0.3286 - val_loss: 1.9543
Epoch 5/30
[1m2048/2048[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 16ms/step - acc: 0.3441 - loss: 1.9128 - val_acc: 0.3176 - val_loss: 2.0502
Epoch 6/30
[1m2048/2048[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 17ms/step - acc: 0.3575 - loss: 1.8461 - val_acc: 0.2539 - val_loss: 2.6938
Epoch 7/30
[1m2048/2048[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m

In [None]:
#evaluating the model
def load_attack_shard(
        fname: str,
        attack_byte: int,
        attack_point: str,
        max_trace_length: int,
        num_traces: int = 256,
        full_key: bool = False) -> Tuple[bytearray, bytearray, Tensor, Tensor]:
    """Load a shard of data that target a given key

    Args:
        fname ([type]): [description]
        attack_byte ([type]): [description]
        attack_point ([type]): [description]
        max_trace_length ([type]): [description]
        num_traces (int, optional): [description]. Defaults to 256.

    Returns:
        list: keys, pts, attack_points_val, power_traces
    """
    del full_key  # unused
    shard = np.load(fname)
    attack_byte = int(attack_byte)

    # key
    k = shard["keys"][attack_byte][:num_traces]
    pts = shard["pts"][attack_byte][:num_traces]
    # load y
    if attack_point == "key":
        y = shard["keys"][attack_byte]
    elif attack_point == "sub_bytes_in":
        y = shard["sub_bytes_in"][attack_byte]
    elif attack_point == "sub_bytes_out":
        y = shard["sub_bytes_out"][attack_byte]
    else:
        raise ValueError(f"Unknown attack point {attack_point}.")

    y = y[:num_traces]
    y = to_categorical(y, 256)
    y = tf.convert_to_tensor(y, dtype="uint8")

    # load x
    x = shard["traces"][:num_traces, :max_trace_length, :]
    x = tf.convert_to_tensor(x, dtype="float32")
    return k, pts, x, y


In [None]:
ATTACK_POINT ="sub_bytes_in"
ATTACK_BYTE = 0
model = tf.keras.models.load_model("models/stm32f0-tinyaes-cnn-v10-ap_sub_bytes_in-byte_0-len_5000.keras")

In [None]:
shard_paths =

In [None]:
NUM_TRACES = 10  # maximum number of traces to use to recover a given key byte. 10 is already overkill
correct_prediction_rank = defaultdict(list)
y_pred = []
y_true = []
model_metrics = {"acc": metrics.Accuracy()}
for shard in tqdm(shard_paths, desc='Recovering bytes', unit='shards'):
    keys, pts, x, y = load_attack_shard(shard, ATTACK_BYTE, ATTACK_POINT, TRACE_LEN, num_traces=NUM_TRACES)

    # prediction
    predictions = model.predict(x)

    # computing byte prediction from intermediate predictions
    key_preds = ap_preds_to_key_preds(predictions, pts, ATTACK_POINT)

    c_preds = from_categorical(predictions)
    c_y = from_categorical(y)
    # metric tracking
    for metric in model_metrics.values():
        metric.update_state(c_y, c_preds)
    # for the confusion matrix
    y_pred.extend(c_preds)
    y_true.extend(c_y)

    # accumulating probabilities and checking correct guess position.
    # if all goes well it will be at position 0 (highest probability)
    # see below on how to use for the real attack


    key = keys[0] # all the same in the same shard - not used in real attack
    vals = np.zeros((256))
    for trace_count, kp in enumerate(key_preds):
        vals = vals  + np.log10(kp + 1e-22)
        guess_ranks = (np.argsort(vals, )[-256:][::-1])
        byte_rank = list(guess_ranks).index(key)
        correct_prediction_rank[trace_count].append(byte_rank)
