In [None]:
import tensorflow as tf
from tensorflow import keras

import math
import os

import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.rcParams.update({
    "pgf.texsystem": "pdflatex",
    'font.family': 'serif',
    'text.usetex': True,
    'pgf.rcfonts': False,
    'font.size': 16
})

SAVE_FIG = True

def save_fig(name, tight_layout = True):
    if SAVE_FIG:
        if not os.path.isdir("plots"):
            os.mkdir("plots")
        print(f"Saving figure {name} ...")
        if tight_layout:
            plt.tight_layout(pad=0.4)
        plt.savefig(f"plots/{name}.pdf", backend="pgf")

In [None]:
print(tf.__version__)

In [None]:
xs = tf.linspace(0, 14, 4000)
ys = tf.math.sin(xs)

In [None]:
plt.plot(xs, ys, "b-")
plt.plot()
plt.grid()
plt.plot([-10, 20], [0, 0], "k-")
plt.axis([0, 14, -1.2, 1.2])
plt.xlabel("$x$")
plt.ylabel("$\sin(t)$")
plt.xticks(tf.range(0, 5 * math.pi, math.pi), ["0", "$\pi$", "$2\pi$", "$3\pi$", "$4\pi$"])
save_fig("sine-curve")
plt.show()

In [None]:
SAMPLES = 2000

tf.random.set_seed(42)

x = tf.random.uniform((SAMPLES, 1), minval=0, maxval=2*math.pi)

tf.random.shuffle(x)

y = tf.math.sin(x)

plt.plot(x, y, "b.")
save_fig("sine-data")
plt.show()

In [None]:
y += 0.25 * tf.random.normal(y.shape)
plt.plot(x, y, "b.")
save_fig("sine-data-noise")
plt.show()

In [None]:
TRAIN_SPLIT = int(0.5 * SAMPLES)
TEST_SPLIT = int(0.2 * SAMPLES)
VAL_SPLIT = SAMPLES - TRAIN_SPLIT - TEST_SPLIT

x_train, x_test, x_val = tf.split(x, [TRAIN_SPLIT, TEST_SPLIT, VAL_SPLIT])
y_train, y_test, y_val = tf.split(y, [TRAIN_SPLIT, TEST_SPLIT, VAL_SPLIT])

assert tf.size(x_train) + tf.size(x_test) + tf.size(x_val) == SAMPLES

In [None]:
plt.plot(x_train, y_train, "b.", label="Train")
plt.plot(x_test, y_test, "y.", label="Test")
plt.plot(x_val, y_val, "r.", label="Validation")
plt.legend()
save_fig("sine-data-noise-legend")
plt.show()

In [None]:
input_layer = keras.layers.Input(shape=[1])
hidden1 = keras.layers.Dense(16, activation="relu", name="hidden1")(input_layer)
output = keras.layers.Dense(1, name="output")(hidden1)
model_1 = keras.Model(inputs=input_layer, outputs=output)

In [None]:
model_1.summary()

In [None]:
sgd = tf.keras.optimizers.SGD()
model_1.compile(optimizer=sgd, loss="mse", metrics=["mae"])

In [None]:
EPOCHS = 200
BATCH_SIZE = 16
history_1 = model_1.fit(x_train, y_train, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_data=(x_val, y_val))

In [None]:
history_1.history.keys()

In [None]:
loss_1 = history_1.history["loss"]
val_loss_1 = history_1.history["val_loss"]
assert len(loss_1) == len(val_loss_1) == EPOCHS

In [None]:
train_epochs = tf.range(1.0, EPOCHS + 1)
val_epochs = train_epochs + 0.5

plt.plot(train_epochs, loss_1, "b-", label="Training Kosten")
plt.plot(val_epochs, val_loss_1, "g-", label="Validation Kosten")
plt.xlabel("Epochen")
plt.ylabel("Kosten")
plt.legend()
save_fig(f"1-kosten")
plt.show()

In [None]:
mae_1 = history_1.history["mae"]
val_mae_1 = history_1.history["val_mae"]

plt.plot(train_epochs, mae_1, "b-", label="Training MAE")
plt.plot(val_epochs, val_mae_1, "g-", label="Validation MAE")
plt.xlabel("Epochen")
plt.ylabel("Kosten")
plt.legend()
save_fig(f"1-mae")
plt.show()

In [None]:
predictions_1 = model_1.predict(x_test)

plt.plot(x_test, y_test, "b.", label="Trainingsdaten")
plt.plot(x_test, predictions_1, "y.", label="Prognose")
plt.legend()
save_fig(f"1-prognosen")
plt.show()

In [None]:
input_layer = keras.layers.Input(shape=[1])
hidden1 = keras.layers.Dense(16, activation="relu", name="hidden1")(input_layer)
hidden2 = keras.layers.Dense(64, activation="relu", name="hidden2")(hidden1)
output = keras.layers.Dense(1, name="output")(hidden2)
model_2 = keras.Model(inputs=input_layer, outputs=output)

In [None]:
model_2.summary()

In [None]:
sgd = tf.keras.optimizers.SGD()
model_2.compile(optimizer=sgd, loss="mse", metrics=["mae"])

In [None]:
EPOCHS = 200

early_stopping_cb = tf.keras.callbacks.EarlyStopping(
    patience=EPOCHS,
    monitor="loss",
    verbose=1,
    restore_best_weights=True)

history_2 = model_2.fit(
    x_train,
    y_train,
    batch_size=BATCH_SIZE,
    epochs=EPOCHS,
    validation_data=(x_val, y_val),
    callbacks=[early_stopping_cb])

In [None]:
loss_2 = history_2.history["loss"]
val_loss_2 = history_2.history["val_loss"]

train_epochs = tf.range(1.0, EPOCHS + 1)
val_epochs = train_epochs + 0.5

plt.plot(train_epochs, loss_2, "b-", label="Training Kosten")
plt.plot(val_epochs, val_loss_2, "g-", label="Validation Kosten")
plt.xlabel("Epochen")
plt.ylabel("Kosten")
plt.legend()
save_fig("2-kosten")
plt.show()

plt.plot(train_epochs, loss_2, "b-", label="Training Kosten")
plt.plot(val_epochs, val_loss_2, "g-", label="Validation Kosten")
plt.xlabel("Epochen")
plt.ylabel("Kosten")
plt.legend()
plt.axis([0, EPOCHS, 0, 0.3])
save_fig("2-kosten-limit")
plt.show()

In [None]:
mae_2 = history_2.history["mae"]
val_mae_2 = history_2.history["val_mae"]
plt.plot(train_epochs, mae_2, "b-", label="Training MAE")
plt.plot(val_epochs, val_mae_2, "g-", label="Validation MAE")
plt.xlabel("Epochen")
plt.ylabel("Kosten")
plt.legend()
plt.axis([0, EPOCHS, 0.15, 0.45])
save_fig("2-mae")
plt.show()

In [None]:
predictions_2 = model_2.predict(x_test)
plt.plot(x_test, y_test, "b.", label="Trainingsdaten")
plt.plot(x_test, predictions_2, "y.", label="Prognose")
plt.legend()
save_fig("2-prognosen")
plt.show()

In [None]:
def save_model(model, model_name: str):
    with open(model_name, "wb") as f:
        f.write(model)

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model_2)
tflite_model = converter.convert()

save_model(tflite_model, "sine_model.tflite")

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model_2)
converter.optimizations = [tf.lite.Optimize.DEFAULT]

def representative_dataset():
    for value in x_test:
        yield [value]

converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_model = converter.convert()

save_model(tflite_model, "sine_model_quant.tflite")

In [None]:
sine_model = tf.lite.Interpreter("sine_model.tflite")
sine_model_quant = tf.lite.Interpreter("sine_model_quant.tflite")

sine_model.allocate_tensors()
sine_model_quant.allocate_tensors()

input_details = sine_model.get_input_details()[0]
output_details = sine_model.get_output_details()[0]

input_details_quant = sine_model_quant.get_input_details()[0]
output_details_quant = sine_model_quant.get_output_details()[0]

input_scale, input_zero_point = input_details_quant["quantization"]

input_index = input_details["index"]
output_index = output_details["index"]

input_index_quant = input_details_quant["index"]
output_index_quant = output_details_quant["index"]

sine_model_predictions = []
sine_model_quant_predictions = []

for value in x_test:
    tensor_float32 = tf.convert_to_tensor([value], dtype=tf.float32)
    tensor_int8 = tf.convert_to_tensor([value / input_scale + input_zero_point], dtype=tf.int8)

    sine_model.set_tensor(input_index, tensor_float32)
    sine_model_quant.set_tensor(input_index_quant, tensor_int8)

    sine_model.invoke()
    sine_model_quant.invoke()

    sine_model_predictions.append(
        sine_model.get_tensor(output_index)[0,0]
    )
    sine_model_quant_predictions.append(
        sine_model_quant.get_tensor(output_index_quant)[0,0] / 128
    )

In [None]:
plt.plot(x_test, y_test, "b.", label="Daten")
plt.plot(x_test, predictions_2, "ro", label="Ursprüngliche Prognose")
plt.plot(x_test, sine_model_predictions, "yx", label="Lite Prognose")
plt.plot(x_test, sine_model_quant_predictions, "g+", label="Lite Quantisierte Prognose")
plt.legend()
save_fig("model-comparison")
plt.show()

In [None]:
import os
sine_size = os.path.getsize("sine_model.tflite")
print(f"Sine model size: {sine_size}")
sine_quant_size = os.path.getsize("sine_model_quant.tflite")
print(f"Sine quant model size: {sine_quant_size}")

In [None]:
import platform

EDGETUP_LIB = {
    "Linux": "libedgetpu.so.1",
    "Darwin": "libedgetpu.1.dylib",
    "Windows": "edgetpu.dll",
}[platform.system()]

delegates = [tf.lite.experimental.load_delegate(EDGETUP_LIB)]
sine_model_tpu = tf.lite.Interpreter(
    "sine_model_quant_edgetpu.tflite",
    experimental_delegates=delegates
)

sine_model_tpu.allocate_tensors()

input_details_tpu = sine_model_tpu.get_input_details()[0]
output_details_tpu = sine_model_tpu.get_output_details()[0]

input_index_tpu = input_details_tpu["index"]
output_index_tpu = output_details_tpu["index"]

In [None]:
import time

values = tf.random.uniform((2000, 1, 1), minval=1, maxval=5)
values = values / input_scale + input_zero_point
values = tf.cast(values, dtype=tf.int8)

start = time.perf_counter()
for value in values:
    sine_model_tpu.set_tensor(input_index_tpu, value)
    sine_model_tpu.invoke()
inference_time = (time.perf_counter() - start) * 1000

print(inference_time)

In [None]:
values = tf.random.uniform((2000, 1, 1), minval=1, maxval=5)
values = values / input_scale + input_zero_point
values = tf.cast(values, dtype=tf.int8)

start = time.perf_counter()
for value in values:
    sine_model_quant.set_tensor(input_index_tpu, value)
    sine_model_quant.invoke()
inference_time = (time.perf_counter() - start) * 1000

print(inference_time)