In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
import skimage.transform
import matplotlib.pyplot as plt

import itertools
from typing import Tuple

In [2]:
tf.test.is_gpu_available()

True

## Load training data

Load data and create shifted versions.  
This is not redundant, even though we are using convolutional layers, as there are boundary effects (especially with the very small size).

In [None]:
WIDTH = 5
HEIGHT = 5

with open("data.txt") as f:
    t = f.read()

xs = []
ys = []

for ex in t.split("\n\n"):
    lines = ex.split("\n")
    y  = int(lines[0])
    x = np.array([[0 if x == "." else 1 for x in line] for line in lines[1:]])
    
    ex_width = np.max(np.arange(1, WIDTH + 1) * x)
    ex_height = np.max((np.arange(1, HEIGHT + 1) * x.T).T)
    print(y, ex_width, ex_height, end="")
    for offset_x, offset_y in itertools.product(range(WIDTH - ex_width + 1),
                                                range(HEIGHT - ex_height + 1)):
        xs.append(np.roll(x, (offset_y, offset_x), axis=(0, 1)))
        ys.append(y)
        print(".", end="")
    print()

xs = np.array(xs).astype(float)[..., np.newaxis]
ys = np.array(ys)

\# of (shifted) examples per class

In [4]:
len(ys)

602

In [5]:
pd.Series(ys).value_counts()

3    78
9    74
6    74
5    73
1    68
4    61
0    56
8    44
2    43
7    31
dtype: int64

In [6]:
i = np.random.randint(0, len(ys))
print(ys[i])
print(xs[i,:,:,0].astype(int))

3
[[0 1 1 1 0]
 [0 0 0 0 1]
 [0 0 0 1 0]
 [0 0 0 0 1]
 [0 1 1 1 0]]


## Add augmentation & create tf dataset

In [7]:
def random_flip(x: np.ndarray, xs_strings: np.ndarray) -> np.ndarray:
    """Randomly flips a bit in x, ensuring that the resulting pattern
       does not occurr in xs_strings
    """
    while True:
        x_new = x.copy()
        pos_y = np.random.randint(0, HEIGHT)
        pos_x = np.random.randint(0, WIDTH)
        x_new[pos_y, pos_x] = 1 - x_new[pos_y, pos_x]
        if x_new.tostring() not in xs_strings:
            return x_new

In [8]:
xs_strings = {x.tostring() for x in xs}

In [9]:
BATCH_SIZE = 512

ds = tf.data.Dataset.zip((
    tf.data.Dataset.from_tensor_slices(xs),
    tf.data.Dataset.from_tensor_slices(ys)
)).shuffle(buffer_size=len(ys)).repeat()

def augment(x: np.ndarray) -> np.ndarray:
    """Flip up to 3 bits randomly"""
    if np.random.uniform() > 0.6:
        x = random_flip(x, xs_strings)
    if np.random.uniform() > 0.85:
        x = random_flip(x, xs_strings)
    if np.random.uniform() > 0.95:
        x = random_flip(x, xs_strings)
    
    return x
    

def map_fn(x: tf.Tensor, y: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
    x = tf.numpy_function(func=augment , inp=[x], Tout=tf.float64)
    x.set_shape([HEIGHT, WIDTH, 1])
    return x, y

ds = ds.map(map_fn, num_parallel_calls=8)
ds = ds.batch(BATCH_SIZE)


## Define model

regularization actually improves training accuracy (?) and makes led activation patterns nicer.

In [10]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(
        4, 2,
        activation="relu", padding="valid",
        kernel_regularizer=tf.keras.regularizers.L1L2(l2=0.001),
        bias_regularizer=tf.keras.regularizers.L1L2(l2=0.001),
        input_shape=(WIDTH, HEIGHT, 1)),
    tf.keras.layers.Conv2D(8, 2, activation="relu",
        kernel_regularizer=tf.keras.regularizers.L1L2(l2=0.001),
        bias_regularizer=tf.keras.regularizers.L1L2(l2=0.001),),
    tf.keras.layers.Conv2D(16, 2, activation="relu",
        kernel_regularizer=tf.keras.regularizers.L1L2(l2=0.001),
        bias_regularizer=tf.keras.regularizers.L1L2(l2=0.001),),
    tf.keras.layers.GlobalMaxPooling2D(),
    tf.keras.layers.Dense(10, activation=tf.keras.activations.softmax)
])

In [11]:
model.count_params()

854

In [12]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0025), metrics=["accuracy"], loss=tf.keras.losses.sparse_categorical_crossentropy)

## Train

In [None]:
# training is fast, so let's just go crazy with the number of epochs
for i in range(100):
    model.fit(ds, epochs=100, verbose=0, steps_per_epoch=len(ys) // BATCH_SIZE)
    print(model.evaluate(xs, ys, verbose=0))

In [14]:
model.optimizer.lr.assign(0.0005)

<tf.Variable 'UnreadVariable' shape=() dtype=float32, numpy=0.0005>

In [None]:
for i in range(40):
    model.fit(ds, epochs=100, verbose=0, steps_per_epoch=len(ys) // BATCH_SIZE)
    print(model.evaluate(xs, ys, verbose=0))

In [None]:
model.optimizer.lr.assign(0.00005)
for i in range(20):
    model.fit(ds, epochs=100, verbose=0, steps_per_epoch=len(ys) // BATCH_SIZE)
    print(model.evaluate(xs, ys, verbose=0))

In [17]:
model.evaluate(ds, steps=len(ys) // BATCH_SIZE)



[0.4516589343547821, 0.890625]

In [18]:
print(model.evaluate(xs, ys, verbose=0))

[0.14010281479635903, 1.0]


In [19]:
xs[model.predict_classes(xs) != ys][...,0]

array([], shape=(0, 5, 5), dtype=float64)

In [20]:
model.save("5x5_4-8-16_filters.savedmodel")

Instructions for updating:
If using Keras pass *_constraint arguments to layers.
INFO:tensorflow:Assets written to: 5x5_4-8-16_filters.savedmodel\assets


In [21]:
model = tf.keras.models.load_model("5x5_4-8-16_filters.savedmodel/")

In [22]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 4, 4, 4)           20        
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 3, 3, 8)           136       
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 2, 2, 16)          528       
_________________________________________________________________
global_max_pooling2d (Global (None, 16)                0         
_________________________________________________________________
dense (Dense)                (None, 10)                170       
Total params: 854
Trainable params: 854
Non-trainable params: 0
_________________________________________________________________


In [24]:
test_input = np.array([
    [0,0,0,0,0],
    [0,1,1,0,0],
    [0,0,1,0,0],
    [0,1,1,1,0],
    [0,0,0,0,0]
    ])[np.newaxis,..., np.newaxis].astype(np.float32)

In [25]:
model(test_input)

<tf.Tensor: id=104350, shape=(1, 10), dtype=float32, numpy=
array([[1.0185980e-04, 9.9014634e-01, 1.9110986e-03, 6.5210951e-03,
        1.5079939e-06, 5.9689349e-04, 6.7623019e-05, 2.2448589e-04,
        1.5240769e-04, 2.7672789e-04]], dtype=float32)>

## Save weights

In [23]:
for i in range(3):
    base_name = f"weights/conv{i}_{{}}.npy"
    kernel, bias = model.layers[i].weights
    np.save(base_name.format("kernel"), kernel.numpy())
    np.save(base_name.format("bias"), bias.numpy())
    
kernel, bias = model.layers[4].weights
np.save("weights/dense_kernel.npy", kernel.numpy())
np.save("weights/dense_bias.npy", bias.numpy())

# Output model for inclusion in C code

## Get 99th percentile activation strengths for led brightness

In [None]:
x = xs
for i in range(3):
    x = model.layers[i](x)
    perc_99 = np.percentile(x, 99)
    print(f"const float conv{i}_activation_99per = {np.format_float_positional(perc_99)};")

## Print weights

In [27]:
def print_float_array(name: str, x: np.ndarray) -> None:
    assert(len(x.shape) == 1)
    print(f"const float {name}[] = {{")
    for i in range(int(np.ceil(len(x) / 5))):
        print("    " + ", ".join(np.format_float_scientific(f) for f in x[i*5: i*5 + 5]) + ",")
    print("};")

In [None]:
for i in range(3):
    print_float_array(
        f"conv{i}_kernel_data",
        model.layers[i].weights[0].numpy().flatten()
    )
    print()
    print_float_array(
        f"conv{i}_bias_data",
        model.layers[i].weights[1].numpy().flatten()
    )
    print()
    
print_float_array(
    "dense_kernel_data",
    model.layers[4].weights[0].numpy().T.flatten()
)
print()
print_float_array(
    "dense_bias_data",
    model.layers[4].weights[1].numpy().flatten()
)