In [1]:
from __future__ import absolute_import, division, print_function

import tensorflow as tf
import numpy as np
import logging

INFO:tensorflow:Enabling eager execution
INFO:tensorflow:Enabling v2 tensorshape
INFO:tensorflow:Enabling resource variables
INFO:tensorflow:Enabling tensor equality
INFO:tensorflow:Enabling control flow v2


In [2]:
from typing import Tuple, Dict, List
from pprint import pformat
from os import listdir, path

In [3]:
def read_int(f):
    ba = bytearray(4)
    f.readinto(ba)
    prm = np.frombuffer(ba, dtype=np.int32)
    return prm[0]

def read_double(f):
    ba = bytearray(8)
    f.readinto(ba)
    prm = np.frombuffer(ba, dtype=np.double)
    return prm[0]

def read_double_tab(f, n):
    ba = bytearray(8*n)
    nr = f.readinto(ba)
    if nr != len(ba):
        return []
    else:
        prm = np.frombuffer(ba, dtype=np.double)
        return prm

In [4]:
def log_object(obj):
    for line in pformat(obj).split('\n'):
        logger.info(line)


def init_logger():
    # create logger
    logger = logging.getLogger('SOLVE')
    logger.setLevel(logging.INFO)

    # create console handler and set level to debug
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)

    # create formatter
    formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s', datefmt='%H:%M:%S')

    # add formatter to ch
    ch.setFormatter(formatter)

    # add ch to logger
    if (logger.hasHandlers()):
        logger.handlers.clear()
    logger.addHandler(ch)
    logger.propagate = False

    return logger

In [5]:
logger = init_logger()

In [6]:
def get_pics_from_file(pathname: str) -> Tuple[np.ndarray, Dict]:
    with open(pathname, "rb") as f:
        # Get info header
        info = {}
        info["label"] = path.basename(pathname)
        info["nb_pics"] = read_int(f)
        info["freq_sampling_khz"] = read_double(f)
        info["freq_trame_hz"] = read_double(f)
        info["freq_pic_khz"] = read_double(f)
        info["norm_fact"] = read_double(f)

        # Parse pics
        pics = []
        while True:
            item = read_double_tab(f, info["nb_pics"])
            if len(item) != info["nb_pics"]:
                break
            item = np.array(item)
            pics.append(np.array(item))

        pics = np.stack(pics, axis=0)
        return pics, info

In [7]:
def get_pics(dir_pathname: str) -> Tuple[List[np.ndarray], List[Dict]]:
    # Compute filenames
    filenames = [
    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
    'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
    'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
    'CTRL', 'ENTER', 'NOKEY', 'SHIFT', 'SPACE', 'SUPPR' ]


    # Loop through filenames
    pics = []
    infos = []
    for filename in filenames:
        filename = f"pics_{filename}.bin"
        logger.info(f"Importing {filename}")

        pathname = path.join(dir_pathname, filename)
        item, info = get_pics_from_file(pathname)

        infos.append(info)
        pics.append(item)

    return pics, infos

In [8]:
def prepare_sets(pics: List[np.ndarray], infos: List[Dict]):
    tab = []
    for i in range(len(pics)):
        tab.append(np.repeat(i, len(pics[i])))

    y = np.concatenate(tab, axis=None)
    x = np.concatenate(pics, axis=0)

    logger.info(f"X: {x.shape}")
    logger.info(f"Y: {y.shape}")

    assert len(x) == len(y)
    p = np.random.permutation(len(x))
    y = y[p]
    x = x[p]

    train_nb = (80 * len(x)) // 100
    x_train = x[:train_nb, ...]
    y_train = y[:train_nb, ...]

    x_test = x[train_nb:, ...]
    y_test = y[train_nb:, ...]

    return (x_train, y_train) , (x_test, y_test)

In [9]:
logger.info('Start solving')

# Get pics
pics, infos = get_pics("./data")

[17:17:57][INFO] Start solving
[17:17:57][INFO] Importing pics_0.bin
[17:17:57][INFO] Importing pics_1.bin
[17:17:57][INFO] Importing pics_2.bin
[17:17:57][INFO] Importing pics_3.bin
[17:17:57][INFO] Importing pics_4.bin
[17:17:57][INFO] Importing pics_5.bin
[17:17:57][INFO] Importing pics_6.bin
[17:17:57][INFO] Importing pics_7.bin
[17:17:57][INFO] Importing pics_8.bin
[17:17:57][INFO] Importing pics_9.bin
[17:17:57][INFO] Importing pics_A.bin
[17:17:57][INFO] Importing pics_B.bin
[17:17:57][INFO] Importing pics_C.bin
[17:17:58][INFO] Importing pics_D.bin
[17:17:58][INFO] Importing pics_E.bin
[17:17:58][INFO] Importing pics_F.bin
[17:17:58][INFO] Importing pics_G.bin
[17:17:58][INFO] Importing pics_H.bin
[17:17:58][INFO] Importing pics_I.bin
[17:17:58][INFO] Importing pics_J.bin
[17:17:58][INFO] Importing pics_K.bin
[17:17:58][INFO] Importing pics_L.bin
[17:17:58][INFO] Importing pics_M.bin
[17:17:58][INFO] Importing pics_N.bin
[17:17:58][INFO] Importing pics_O.bin
[17:17:58][INFO] Im

In [10]:
# Compute sets
(x_train, y_train), (x_test, y_test) = prepare_sets(pics, infos)

# Convert to float32.
x_train, x_test = np.array(x_train, np.float32), np.array(x_test, np.float32)

# Normalize value to [0, 1].
x_train, x_test = x_train / 2.75, x_test / 2.75

logger.info(f"x_train: {x_train.shape}")
logger.info(f"y_train: {y_train.shape}")
logger.info(f"x_test: {x_test.shape}")
logger.info(f"y_test: {y_test.shape}")

[17:18:00][INFO] X: (351612, 17)
[17:18:00][INFO] Y: (351612,)
[17:18:00][INFO] x_train: (281289, 17)
[17:18:00][INFO] y_train: (281289,)
[17:18:00][INFO] x_test: (70323, 17)
[17:18:00][INFO] y_test: (70323,)


In [11]:
# Get pics to solve
to_solve, info_to_solve = get_pics_from_file("./data/pics_LOGINMDP.bin")
to_solve = np.array(to_solve, np.float32)
to_solve = to_solve / 2.75

In [12]:
num_classes = len(pics) # total classes
num_features = 17 # data features: data shape

In [13]:
# Training parameters.
learning_rate = 0.005
training_steps = 20000
batch_size = 256
display_step = 250

In [14]:
# Network parameters.
n_hidden_1 = 64 # 1st layer number of neurons.
n_hidden_2 = 64 # 2nd layer number of neurons.

# Use tf.data API to shuffle and batch data.
train_data = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_data = train_data.repeat().shuffle(5000).batch(batch_size).prefetch(1)

In [15]:
# Store layers weight & bias

# A random value generator to initialize weights.
random_normal = tf.initializers.RandomNormal()

weights = {
    'h1': tf.Variable(random_normal([num_features, n_hidden_1])),
    'h2': tf.Variable(random_normal([n_hidden_1, n_hidden_2])),
    'out': tf.Variable(random_normal([n_hidden_2, num_classes]))
}
biases = {
    'b1': tf.Variable(tf.zeros([n_hidden_1])),
    'b2': tf.Variable(tf.zeros([n_hidden_2])),
    'out': tf.Variable(tf.zeros([num_classes]))
}

In [16]:
# Create model.
def neural_net(x):
    # Hidden fully connected layer with 128 neurons.
    layer_1 = tf.add(tf.matmul(x, weights['h1']), biases['b1'])
    # Apply sigmoid to layer_1 output for non-linearity.
    layer_1 = tf.nn.sigmoid(layer_1)

    # Hidden fully connected layer with 256 neurons.
    layer_2 = tf.add(tf.matmul(layer_1, weights['h2']), biases['b2'])
    # Apply sigmoid to layer_2 output for non-linearity.
    layer_2 = tf.nn.sigmoid(layer_2)

    # Output fully connected layer with a neuron for each class.
    out_layer = tf.matmul(layer_2, weights['out']) + biases['out']
    # Apply softmax to normalize the logits to a probability distribution.
    return tf.nn.softmax(out_layer)


# Cross-Entropy loss function.
def cross_entropy(y_pred, y_true):
    # Encode label to a one hot vector.
    y_true = tf.one_hot(y_true, depth=num_classes)
    # Clip prediction values to avoid log(0) error.
    y_pred = tf.clip_by_value(y_pred, 1e-9, 1.)
    # Compute cross-entropy.
    return tf.reduce_mean(-tf.reduce_sum(y_true * tf.math.log(y_pred)))

# Accuracy metric.
def accuracy(y_pred, y_true):
    # Predicted class is the index of highest score in prediction vector (i.e. argmax).
    correct_prediction = tf.equal(tf.argmax(y_pred, 1), tf.cast(y_true, tf.int64))
    return tf.reduce_mean(tf.cast(correct_prediction, tf.float32), axis=-1)


In [17]:
# Stochastic gradient descent optimizer.
optimizer = tf.optimizers.SGD(learning_rate)

In [18]:
# Optimization process.
def run_optimization(x, y):
    # Wrap computation inside a GradientTape for automatic differentiation.
    with tf.GradientTape() as g:
        pred = neural_net(x)
        loss = cross_entropy(pred, y)

    # Variables to update, i.e. trainable variables.
    trainable_variables = list(weights.values()) + list(biases.values())

    # Compute gradients.
    gradients = g.gradient(loss, trainable_variables)

    # Update W and b following gradients.
    optimizer.apply_gradients(zip(gradients, trainable_variables))


# Run training for the given number of steps.
for step, (batch_x, batch_y) in enumerate(train_data.take(training_steps), 1):
    # Run the optimization to update W and b values.
    run_optimization(batch_x, batch_y)

    if step % display_step == 0:
        pred = neural_net(batch_x)
        loss = cross_entropy(pred, batch_y)
        acc = accuracy(pred, batch_y)
        logger.info("step: %i, loss: %f, accuracy: %f" % (step, loss, acc))

[17:18:02][INFO] step: 250, loss: 945.809448, accuracy: 0.042969
[17:18:03][INFO] step: 500, loss: 949.481445, accuracy: 0.058594
[17:18:05][INFO] step: 750, loss: 949.059021, accuracy: 0.039062
[17:18:07][INFO] step: 1000, loss: 953.141968, accuracy: 0.039062
[17:18:08][INFO] step: 1250, loss: 948.504822, accuracy: 0.054688
[17:18:10][INFO] step: 1500, loss: 948.982422, accuracy: 0.058594
[17:18:11][INFO] step: 1750, loss: 938.722534, accuracy: 0.035156
[17:18:13][INFO] step: 2000, loss: 916.571045, accuracy: 0.058594
[17:18:15][INFO] step: 2250, loss: 873.223511, accuracy: 0.082031
[17:18:16][INFO] step: 2500, loss: 835.287598, accuracy: 0.117188
[17:18:18][INFO] step: 2750, loss: 808.415161, accuracy: 0.136719
[17:18:19][INFO] step: 3000, loss: 750.358887, accuracy: 0.128906
[17:18:21][INFO] step: 3250, loss: 774.097473, accuracy: 0.117188
[17:18:22][INFO] step: 3500, loss: 757.078857, accuracy: 0.167969
[17:18:24][INFO] step: 3750, loss: 750.815613, accuracy: 0.167969
[17:18:26][IN

In [20]:
# Test model on validation set.
pred = neural_net(x_test)
logger.info("Test Accuracy: %f" % accuracy(pred, y_test))


# Predict 5 signal from validation set
n_pics = 5
test_pics = x_test[:n_pics]
predictions = neural_net(test_pics)

logger.info("Test dataset")
for i in range(n_pics):
    pic = test_pics[i]
    info = infos[y_test[i]]
    logger.info(f"Model prediction: {infos[np.argmax(predictions.numpy()[i])]['label']}")
    logger.info(f"Model label: {info['label']}")

predictions = neural_net(to_solve)
logger.info("")
logger.info("Password")
for i in range(to_solve.shape[0]):
    pic = to_solve[i]
    logger.info(f"Model prediction: {infos[np.argmax(predictions.numpy()[i])]['label']}")

[17:20:42][INFO] Test Accuracy: 0.530168
[17:20:42][INFO] Test dataset
[17:20:42][INFO] Model prediction: pics_F.bin
[17:20:42][INFO] Model label: pics_F.bin
[17:20:42][INFO] Model prediction: pics_1.bin
[17:20:42][INFO] Model label: pics_3.bin
[17:20:42][INFO] Model prediction: pics_C.bin
[17:20:42][INFO] Model label: pics_C.bin
[17:20:42][INFO] Model prediction: pics_ENTER.bin
[17:20:42][INFO] Model label: pics_ENTER.bin
[17:20:42][INFO] Model prediction: pics_0.bin
[17:20:42][INFO] Model label: pics_0.bin
[17:20:42][INFO] 
[17:20:42][INFO] Password
[17:20:42][INFO] Model prediction: pics_NOKEY.bin
[17:20:42][INFO] Model prediction: pics_NOKEY.bin
[17:20:42][INFO] Model prediction: pics_NOKEY.bin
[17:20:42][INFO] Model prediction: pics_NOKEY.bin
[17:20:42][INFO] Model prediction: pics_NOKEY.bin
[17:20:42][INFO] Model prediction: pics_A.bin
[17:20:42][INFO] Model prediction: pics_NOKEY.bin
[17:20:42][INFO] Model prediction: pics_NOKEY.bin
[17:20:42][INFO] Model prediction: pics_NOKEY.