# Setup

In [None]:
import re
import os
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras.callbacks import ModelCheckpoint
os.environ['XLA_FLAGS'] = '--xla_gpu_cuda_data_dir=/opt/conda'

# Training

#### Define Training Folders

In [None]:
folders = ["images4_png_4-6/4-6", "images4_png_1", "images4_png_2", 'images4_png_3']

#### Defining Functions

In [None]:
def build_image_paths(folders):
    # List of folders
    # Create a list of image paths
    image_paths = []
    for folder in folders:
        image_paths += [os.path.join(folder, fname) for fname in os.listdir(folder) if fname.endswith('.png')]
    # Ensure the list is sorted
    image_paths = sorted(image_paths)
    return image_paths

def preprocess_image(image_path):
    # Read in image path
    image = tf.io.read_file(image_path)
    # Decode into tensor
    image = tf.image.decode_png(image, channels=1)
    # Resize for the model 
    image = tf.image.resize(image, [481, 600])  # Resize as expected by your model
    # Cast to an integer for computational simplicity
    image = tf.cast(image, tf.uint8)  # Ensure image is uint8
    return image

def augment(image, label, margin):
    # Margin value is (1-probability) of image getting flipped so a margin of .95 equals a 5% chance of augmentation
    # Code to flip image left or right
    image = tf.cond(tf.random.uniform([], 0, 1) > margin, lambda: tf.image.random_flip_left_right(image), lambda: image)
    # Code to flip image up or down 
    image = tf.cond(tf.random.uniform([], 0, 1) > margin, lambda: tf.image.random_flip_up_down(image), lambda: image)
    return image, label

def parse_label_from_path(path):
    # Uses regex to extract the user name form the image path
    match = re.search(r'(\d+)_keystrokes', path.numpy().decode('utf-8'))
    label = int(match.group(1)) if match else 0
    # Casts to int and return value
    return tf.cast(label, tf.uint8)

def load_and_preprocess_data(image_path):
    # Wrapper function for preprocessing and parsing image label (username)
    image = preprocess_image(image_path)
    label = tf_parse_label_from_path(image_path)
    return image, label

def tf_parse_label_from_path(path):
    # Wrapper function
    return tf.py_function(parse_label_from_path, [path], Tout=tf.uint8)

def shuffle_batch(features, labels):
    # Calculate batch size
    batch_size = tf.shape(features)[0]
    # Create an index to shuffle features and labels in the same order
    shuffled_indices = tf.random.shuffle(tf.range(start=0, limit=batch_size))
    # Apply gathered indices to shuffle the batch
    shuffled_features = tf.gather(features, shuffled_indices)
    shuffled_labels = tf.gather(labels, shuffled_indices)
    return shuffled_features, shuffled_labels

def create_model():
    # Creates an untrained FPNet 
    model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(filters=24, kernel_size=(1,3), activation='relu', input_shape=(481, 600, 1)),
    tf.keras.layers.MaxPooling2D(pool_size=(1,2), strides=(1,2)),
    tf.keras.layers.Conv2D(filters=32, kernel_size=(1,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(1,2), strides=(1,2), padding='same'),
    tf.keras.layers.Conv2D(filters=64, kernel_size=(1,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(1,2), strides=(1,2), padding='same'),
    tf.keras.layers.Conv2D(filters=64, kernel_size=(1,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(1,2), strides=(1,2), padding='same'),
    tf.keras.layers.Conv2D(filters=96, kernel_size=(3,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(1,2), strides=(1,2), padding='same'),
    tf.keras.layers.Conv2D(filters=96, kernel_size=(3,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(1,2), strides=(1,2), padding='same'),
    tf.keras.layers.Conv2D(filters=128, kernel_size=(3,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(1,2), strides=(1,2), padding='same'),
    tf.keras.layers.Conv2D(filters=128, kernel_size=(3,3), activation='relu'),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation=None), # No activation on final dense layer
    tf.keras.layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1)) # L2 normalize embeddings
    ])
    return model

#### Creating Training Data

In [None]:
# Building image paths
image_paths = build_image_paths(folders)
# Assuming 'image_paths' is a list of paths to the images
image_paths_ds = tf.data.Dataset.from_tensor_slices(image_paths)
# Apply the `load_and_preprocess_data` function to each item
training_set = image_paths_ds.map(load_and_preprocess_data, num_parallel_calls=tf.data.AUTOTUNE)
# Apply data augmentation only to the training dataset
training_set = training_set.map(
    lambda image, label: augment(image, label, .95),
    num_parallel_calls=tf.data.experimental.AUTOTUNE
)
# Batching and prefetching
training_set = training_set.batch(256).prefetch(buffer_size=tf.data.AUTOTUNE)

#### Creating Training Checkpoint

In [None]:
# Creates model check point in case training freezes
model_checkpoint_callback = ModelCheckpoint(
    filepath='fpnet_weights.h5',
    save_weights_only=True,
    monitor='loss',  # Change this to 'loss'
    mode='min',
    save_best_only=True,
    verbose=1)

In [None]:
# Create instance of model
model = create_model()
# Compile the model with optimizers hyper parameters and loss function as described in "Device Fingerprinting with Peripheral Timestamps"
model.compile(
    optimizer=tf.keras.optimizers.Adam(0.001, beta_1=.9, beta_2=.999),
    loss=tfa.losses.TripletSemiHardLoss()
)
# Fit the model
history = model.fit(
    training_set,
    epochs=100,
    callbacks=[model_checkpoint_callback]
)