In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, auc, accuracy_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from tensorflow.keras.layers import *
from tensorflow.keras import models, Model
import qkeras
from qkeras import *
from sparsepixels.layers import *
from sparsepixels.utils import *

(X_train, y_train), (X_test, y_test) = keras.datasets.mnist.load_data()

n_val = 10000
X_val = X_train[:n_val]
y_val = y_train[:n_val]
X_train = X_train[n_val:]
y_train = y_train[n_val:]

X_train = np.reshape(X_train, (-1,28,28,1)) / 255.
X_val = np.reshape(X_val, (-1,28,28,1)) / 255.
X_test = np.reshape(X_test, (-1,28,28,1)) / 255.

y_train = to_categorical(y_train, 10)
y_val = to_categorical(y_val, 10)
y_test = to_categorical(y_test, 10)

print("X_train.shape: " + str(X_train.shape))
print("y_train.shape: " + str(y_train.shape))
print("X_val.shape: " + str(X_val.shape))
print("y_val.shape: " + str(y_val.shape))
print("X_test.shape: " + str(X_test.shape))
print("y_test.shape: " + str(y_test.shape))

In [None]:
import os
import random
os.environ['PYTHONHASHSEED'] = str(123)
random.seed(123)
tf.random.set_seed(123)
np.random.seed(123)

noise_type='uniform'
#noise_type='poisson'
noise_level=0.5
inflate_factor=4
threshold=0.55
target_size_x=32
target_size_y=target_size_x

x_train_pooled = pool_pad_noise_inflate(X_train, pool_size=3, pool_type='avg', target_size=(target_size_x,target_size_y), noise_type=noise_type, noise_level=0, inflate_factor=1)
x_train_pooled_inflated = pool_pad_noise_inflate(X_train, pool_size=3, pool_type='avg', target_size=(target_size_x,target_size_y), noise_type=noise_type, noise_level=0, inflate_factor=inflate_factor)

x_train = pool_pad_noise_inflate(X_train, pool_size=3, pool_type='avg', target_size=(target_size_x,target_size_y), noise_type=noise_type, noise_level=noise_level, inflate_factor=inflate_factor)
x_val = pool_pad_noise_inflate(X_val, pool_size=3, pool_type='avg', target_size=(target_size_x,target_size_y), noise_type=noise_type, noise_level=noise_level, inflate_factor=inflate_factor)
x_test = pool_pad_noise_inflate(X_test, pool_size=3, pool_type='avg', target_size=(target_size_x,target_size_y), noise_type=noise_type, noise_level=noise_level, inflate_factor=inflate_factor)

for i in range(5):
    plot_sparse(X_train, x_train_pooled, x_train_pooled_inflated, x_train, i, threshold=threshold)

In [None]:
n_max_pixels = 10

def build_cnn(is_sparse):
    quantizer = quantized_bits(6, 0, alpha=1)
    quantized_relu = 'quantized_relu(6, 0)'

    x_in = keras.Input(shape=(x_train.shape[1], x_train.shape[2], x_train.shape[3]), name='x_in')
    if is_sparse:
        x, keep_mask = InputReduce(n_max_pixels=n_max_pixels, threshold=threshold, name='input_reduce')(x_in)
    else:
        x = x_in

    if is_sparse:
        x = QConv2DSparse(filters=3, kernel_size=(3, 3), use_bias=True, name='conv1',
                          padding='same', strides=1,
                          kernel_quantizer=quantizer, bias_quantizer=quantizer)([x, keep_mask])
    else:
        x = QConv2D(filters=3, kernel_size=(3, 3), use_bias=True, name='conv1',
                    padding='same', strides=1,
                    kernel_quantizer=quantizer, bias_quantizer=quantizer)(x)
    x = QActivation(quantized_relu, name='relu1')(x)

    if is_sparse:
        x, keep_mask = AveragePooling2DSparse(2, name='pool1')([x, keep_mask])
    else:
        x = AveragePooling2D(2, name='pool1')(x)

    if is_sparse:
        x = QConv2DSparse(filters=3, kernel_size=(3, 3), use_bias=True, name='conv2',
                          padding='same', strides=1,
                          kernel_quantizer=quantizer, bias_quantizer=quantizer)([x, keep_mask])
    else:
        x = QConv2D(filters=3, kernel_size=(3, 3), use_bias=True, name='conv2',
                    padding='same', strides=1,
                    kernel_quantizer=quantizer, bias_quantizer=quantizer)(x)
    x = QActivation(quantized_relu, name='relu2')(x)

    if is_sparse:
        x, keep_mask = AveragePooling2DSparse(2, name='pool2')([x, keep_mask])
    else:
        x = AveragePooling2D(2, name='pool2')(x)

    x = Flatten(name='flatten')(x)

    x = QDense(32, kernel_quantizer=quantizer, bias_quantizer=quantizer, name='dense1')(x)
    x = QActivation(quantized_relu, name='relu3')(x)

    x = QDense(10, kernel_quantizer=quantizer, bias_quantizer=quantizer, name='dense2')(x)
    x = Activation('softmax', name='softmax')(x)

    name = 'cnn_sparse'
    if not is_sparse:
        name = 'cnn_full'
    return keras.Model(x_in, x, name=name)

cnn_sparse = build_cnn(is_sparse=True)
cnn_sparse.compile(optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),
              loss='categorical_crossentropy', metrics = ['accuracy'])
print(cnn_sparse.summary())

cnn_full = build_cnn(is_sparse=False)
cnn_full.compile(optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),
              loss='categorical_crossentropy', metrics = ['accuracy'])
print(cnn_full.summary())

In [None]:
early_stop = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    min_delta=1e-3,
    patience=5,
    mode='min',
    restore_best_weights=True,
)
history = cnn_sparse.fit(x_train, y_train, validation_data=(x_val, y_val), epochs=100, batch_size=128, callbacks=[early_stop])

plt.figure(figsize=(15,10))
axes = plt.subplot(2, 2, 1)
axes.plot(history.history['loss'], label='train loss')
axes.plot(history.history['val_loss'], label='val loss')
axes.legend(loc="upper right")
axes.set_xlabel('Epoch')
axes.set_ylabel('Loss')

In [None]:
history = cnn_full.fit(x_train, y_train, validation_data=(x_val, y_val), epochs=100, batch_size=128, callbacks=[early_stop])

plt.figure(figsize=(15,10))
axes = plt.subplot(2, 2, 1)
axes.plot(history.history['loss'], label='train loss')
axes.plot(history.history['val_loss'], label='val loss')
axes.legend(loc="upper right")
axes.set_xlabel('Epoch')
axes.set_ylabel('Loss')

In [None]:
y_pred_sparse = cnn_sparse.predict(x_test)
y_pred_full = cnn_full.predict(x_test)
print("accuracy (sparse cnn) = {}".format(accuracy_score(np.argmax(y_test, axis=1), np.argmax(y_pred_sparse, axis=1))))
print("accuracy (full cnn) = {}".format(accuracy_score(np.argmax(y_test, axis=1), np.argmax(y_pred_full, axis=1))))

In [None]:
def plot_roc(y_test, y_pred_sparse, y_pred_full, labels):
    colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
    for x, label in enumerate(labels):
        color = colors[x % len(colors)]
        fpr_full, tpr_full, _ = roc_curve(y_test[:, x], y_pred_full[:, x])
        plt.plot(tpr_full, fpr_full, label='{0} ({1:.3f}), full'.format(label, auc(fpr_full, tpr_full)), linestyle='-', lw=2, color=color)
        fpr_sparse, tpr_sparse, _ = roc_curve(y_test[:, x], y_pred_sparse[:, x])
        plt.plot(tpr_sparse, fpr_sparse, label='{0} ({1:.3f}), sparse'.format(label, auc(fpr_sparse, tpr_sparse)), linestyle='--', lw=2, color=color)
    plt.semilogy()
    plt.xlabel("tpr", size=12, loc='right')
    plt.ylabel("fpr", size=12, loc='top')
    plt.xlim(0., 1)
    plt.ylim(0.001, 1)
    plt.legend(loc='best', framealpha=0., prop={'size': 10})

plt.figure(figsize=(8, 8))
plot_roc(y_test, y_pred_sparse, y_pred_full, ['0','1','2','3','4','5','6','7','8','9'])

In [None]:
layer_names = [
    'x_in', 'input_reduce',
    'conv1', 'relu1', 'pool1',
    'conv2', 'relu2', 'pool2',
]

plot_tensors = []
plot_names = []
for name in layer_names:
    layer = cnn_sparse.get_layer(name)
    output = layer.output
    if isinstance(output, (list, tuple)):
        plot_tensors.append(output[0])
        plot_names.append(f'{name} (x_reduced)')
        plot_tensors.append(output[1])
        plot_names.append(f'{name} (x_mask)')
    else:
        plot_tensors.append(output)
        plot_names.append(name)

model_cnnpart = models.Model(inputs=cnn_sparse.input, outputs=plot_tensors)
layers_pred = model_cnnpart.predict(x_test[1:2])

i = 0
while i < len(plot_names):
    name = plot_names[i]

    if "(x_reduced)" in name and i+1 < len(plot_names) and "(x_mask)" in plot_names[i+1]:
        out_r = layers_pred[i] # (1, h, w, c)
        out_m = layers_pred[i+1] # (1, h, w, 1)

        arr_r = out_r[0] # (h, w, c)
        arr_m = out_m[0,...,0] # (h, w)
        h, w, c = arr_r.shape

        fig, axes = plt.subplots(1, c+1, figsize=((c+1)*3, 3))
        fig.suptitle(name.replace(" (x_reduced)", ""), fontsize=14)

        for ch in range(c):
            ax = axes[ch]
            ax.imshow(arr_r[..., ch], cmap='gray')
            ax.set_title(f"ch{ch}")

        axm = axes[c]
        axm.imshow(arr_m, cmap='gray')
        axm.set_title("mask")

        plt.tight_layout()
        plt.show()

        i += 2
        continue

    out = layers_pred[i]
    arr = out[0]

    if arr.ndim == 2:
        fig, ax = plt.subplots(1, 1, figsize=(3, 3))
        fig.suptitle(name, fontsize=14)
        ax.imshow(arr, cmap='gray')
        ax.set_title("ch0")
        plt.tight_layout()
        plt.show()

    elif arr.ndim == 3:
        h, w, c = arr.shape
        fig, axes = plt.subplots(1, c, figsize=(c*3, 3))
        fig.suptitle(name, fontsize=14)
        for ch in range(c):
            ax = axes[ch] if c>1 else axes
            ax.imshow(arr[..., ch], cmap='gray')
            ax.set_title(f"ch{ch}")
        plt.tight_layout()
        plt.show()

    i += 1

In [None]:
cnn_full.save_weights('w_full.h5')
cnn_sparse.save_weights('w_sparse.h5')

In [None]:
cnn_full = build_cnn(is_sparse=False)
cnn_sparse = build_cnn(is_sparse=True)

cnn_full.load_weights('w_full.h5')
cnn_sparse.load_weights('w_sparse.h5')

## hls

In [None]:
x_in = keras.Input(shape=cnn_sparse.input_shape[1:], name="x_in")
x = x_in
for layer in cnn_sparse.layers:
    if isinstance(layer, keras.layers.InputLayer):
        continue
    if isinstance(layer, InputReduce):
        continue
    if layer.name.startswith("mask_pool"):
        continue

    if isinstance(layer, QConv2DSparse):
        x = layer.conv(x)
    elif isinstance(layer, AveragePooling2DSparse):
        x = layer.avg_pool(x)
    else:
        x = layer(x)

cnn_sparse_forhls = keras.Model(x_in, x, name='cnn_sparse_forhls')
cnn_sparse_forhls.summary()

In [None]:
import hls4ml
config = hls4ml.utils.config_from_keras_model(cnn_sparse_forhls, granularity='name', backend='Vitis')
config

cnn_sparse_hls = hls4ml.converters.convert_from_keras_model(
    cnn_sparse_forhls,
    hls_config=config,
    project_name='hls_cnn_sparse',
    backend='Vitis',
    output_dir='hls_cnn_sparse',
    part='xcu250-figd2104-2L-e',
    #io_type='io_stream',
    io_type='io_parallel',
)

#cnn_sparse_hls.compile()
cnn_sparse_hls.write()

In [None]:
config = hls4ml.utils.config_from_keras_model(cnn_full, granularity='name', backend='Vitis')
config['LayerName']['conv1']['ParallelizationFactor'] = 8
config['LayerName']['conv2']['ParallelizationFactor'] = 8
config

cnn_full_hls = hls4ml.converters.convert_from_keras_model(
    cnn_full,
    hls_config=config,
    project_name='hls_cnn_full',
    backend='Vitis',
    output_dir='hls_cnn_full',
    part='xcu250-figd2104-2L-e',
    #io_type='io_stream',
    io_type='io_parallel',
)

#cnn_full_hls.compile()
cnn_full_hls.write()

## test bench

In [None]:
n_tb = 20
x_tb = x_test[:n_tb]
y_tb = y_pred_sparse[:n_tb]

with open("tb_input_features.dat", "w") as f:
    x_tb_flat = x_tb.reshape(n_tb, -1)
    for row in x_tb_flat:
        f.write(" ".join(str(v) for v in row))
        f.write("\n")

with open("tb_output_predictions.dat", "w") as f:
    for row in y_tb:
        f.write(" ".join(str(v) for v in row))
        f.write("\n")