# Modelle auf Embedded Geräten

Dieses Notebook ist Teil des Projekts [EmbedML](https://hahn-schickard.gitbook.io/embedml) und basiert auf den Inhalten des Kapitels [Modelle auf Embedded Geräten](https://hahn-schickard.gitbook.io/embedml/3_tinyml/3.4_modelle_auf_embedded_geraeten). Es ist auf [Colab](https://colab.research.google.com/drive/1gqcQ54VTYV4zW6mC6MQMqHC-x4etYXDF?usp=sharing) und im [GitHub-Repository von Hahn-Schickard](https://github.com/Hahn-Schickard/EmbedML/blob/master/src/Modelle%20auf%20embedded%20Ger%C3%A4ten.ipynb) zu finden.

Installation der benötigten Bibliotheken in den angegebenen Versionen.

In [1]:
!pip install -q numpy==1.26.4 tensorflow-gpu==2.10 > /dev/null 2>&1

Das System kann den angegebenen Pfad nicht finden.


In [2]:
import os

import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Conv2D, Dense, Dropout, Flatten, MaxPooling2D
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam

Es werden Ordner erstellt, falls diese noch nicht vohanden sind, in welchen die in diesem Notebook erstellten Modelle gespeichert werden.

In [3]:
if not os.path.exists(os.path.join('assets')):
    os.makedirs(os.path.join('assets'))
    
if not os.path.exists(os.path.join('assets', 'models')):
    os.makedirs(os.path.join('assets', 'models'))
    
if not os.path.exists(os.path.join('assets', 'models', 'c++')):
    os.makedirs(os.path.join('assets', 'models', 'c++'))
    
if not os.path.exists(os.path.join('assets', 'models', 'tf')):
    os.makedirs(os.path.join('assets', 'models', 'tf'))
    
if not os.path.exists(os.path.join('assets', 'models', 'tflite')):
    os.makedirs(os.path.join('assets', 'models', 'tflite'))

## Baseline Modell

Ein Basline Modell wird trainiert, welches als Grundlage für das Notebook dient.

Im ersten Schritt, wird der MNIST-Datensatz geladen. Die Pixelwerte des MNIST-Datensatzes werden durch 255 geteilt, um sie auf den Bereich von 0 bis 1 zu normalisieren, da die ursprünglichen Werte zwischen 0 und 255 liegen. Im selben Schritt werden die Daten, in den Datentyp float32 konvertiert und in Trainings- und Testdaten aufgeteilt.

In [4]:
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = (X_train / 255.0).astype(np.float32)
X_test = (X_test / 255.0).astype(np.float32)

print(X_train.shape, X_test.shape)

(60000, 28, 28) (10000, 28, 28)


Durch die Verwendung von np.expand_dims mit axis=3 wird eine neue Dimension an der vierten Position (Index 3) der Arrays `X_train` und `X_test` hinzugefügt. Durch das Hinzufügen einer zusätzlichen Dimension wird die Form zu (Anzahl_Bilder, Höhe, Breite, Kanäle), was später für das neuronale Netz erforderlich ist.

In [5]:
X_train = np.expand_dims(X_train, axis=3)
X_test = np.expand_dims(X_test, axis=3)

print(X_train.shape, X_test.shape)

(60000, 28, 28, 1) (10000, 28, 28, 1)


Die Funktion `create_model` definiert ein sequentielles neuronales Netzwerk in TensorFlow, das für die Klassifikation von Bildern mit einer Eingabegröße von 28x28x1 Pixeln konzipiert ist. Das Modell beginnt mit einer 2D-Faltungsschicht (Conv2D), um Merkmale aus den Eingabedaten zu extrahieren, gefolgt von einer Max-Pooling-Schicht (MaxPooling2D), die die räumlichen Dimensionen der Daten reduziert. Diese Schichten folgen erneut. Die resultierenden Merkmalskarten werden durch eine Flatten-Schicht in einen eindimensionalen Vektor umgewandelt. Dieser Vektor wird dann durch vollvernetzte Schichten (Dense) verarbeitet, gefolgt von Dropout-Schichten (Dropout) zur Vermeidung von Überanpassung. Abschließend liefert eine Dense-Ausgabeschicht mit 10 Neuronen und der Softmax-Aktivierungsfunktion die Wahrscheinlichkeiten für jede der 10 Klassen. Das Modell wird mit dem Adam-Optimizer trainiert und verwendet die Verlustfunktion sparse_categorical_crossentropy. Als Metrik zur Bewertung der Modellleistung wird die Genauigkeit (accuracy) herangezogen.

In [6]:
# Funktion zur Definition eines neuronalen Netzes
def create_model():

    model = Sequential([
        Conv2D(16, (3, 3), activation="relu", input_shape=(28, 28, 1)),
        MaxPooling2D((2, 2)),
        Conv2D(16, (3, 3), activation="relu"),
        MaxPooling2D((2, 2)),
        Flatten(),
        Dense(32, activation="relu"),
        Dropout(0.2),
        Dense(16, activation="relu"),
        Dropout(0.2),
        Dense(10, activation="softmax")
    ])

    model.compile(optimizer=Adam(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    
    return model

Hier wird das zuvor definierte Modell mit den Trainingsdaten `X_train` und den zugehörigen Label `y_train`, beispielhaft für 5 Epochen trainiert, wobei 20% der Daten für die Validierung verwendet werden, um die Leistung des Modells während des Trainings zu überwachen und Überanpassung zu vermeiden.

In [7]:
model = create_model()

model.summary()

model.fit(X_train, y_train, epochs=5, validation_split=0.2, batch_size=128)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 26, 26, 16)        160       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 13, 13, 16)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 11, 11, 16)        2320      
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 5, 5, 16)         0         
 2D)                                                             
                                                                 
 flatten (Flatten)           (None, 400)               0         
                                                                 
 dense (Dense)               (None, 32)                1

<keras.callbacks.History at 0x2915225ff10>

Das ursprüngliche Modell wird mit den Testdaten evaluiert, um die Testgenauigkeit zu berechnen.

In [8]:
_, baseline_model_accuracy = model.evaluate(X_test, y_test, verbose=0)

print(f'Baseline Test Accuracy: {baseline_model_accuracy:.3}')

Baseline Test Accuracy: 0.976


Das trainierte TensorFlow Modell wird in die Datei `mnist_model.h5` gespeichert.

In [9]:
model.save(os.path.join('assets', 'models', 'tf', 'mnist_model.h5'))

## Modellkonvertierung zu TensorFlow Lite

Die Funktion `representative_dataset_gen` dient dazu, eine repräsentative Stichprobe des Trainingsdatensatzes für die Kalibrierung eines TensorFlow Lite Modells bereitzustellen. Sie iteriert hier über die ersten 500 Datenpunkte des Trainingsdatensatzes `X_train` und gibt jedes Beispiel als Liste mit einem Element zurück. Dies ist erforderlich, da das Modell nur einen Eingabewert erwartet, und die Kalibrierungsfunktion eine Liste von Eingabewerten benötigt.

In [10]:
def representative_dataset_gen(dataset):
    # Erstellen eines tf.data.Dataset und Batch-Größe von 1
    dataset = tf.data.Dataset.from_tensor_slices(dataset).batch(1)
    # Iteriere über die ersten 500 Beispiele des Trainingsdatensatzes
    for input_value in dataset.take(500):
        # Das Modell erwartet nur einen Eingabewert, daher wird jedes Beispiel
        # als Liste mit einem einzigen Element zurückgegeben.
        yield [input_value]


Laden des TensorFlow-Modells und erstellen eines `TFLiteConverter`-Objekts aus diesem Modell. Der `TFLiteConverter` wird verwendet, um das TensorFlow-Modell in ein TensorFlow Lite Modell zu konvertieren.

In [11]:
model = tf.keras.models.load_model(os.path.join("assets", "models", "tf", "mnist_model.h5"))
converter = tf.lite.TFLiteConverter.from_keras_model(model)

In dieser Zelle werden die Parameter für die Quantisierung festgelegt. Die Variable `quantization` wird `True` gesetzt, um anzugeben, dass die Quantisierung während der Modellkonvertierung durchgeführt werden soll. Außerdem wird `quant_dtype` auf `int8 only` festgelegt, um zu spezifizieren, dass die Quantisierung ausschließlich auf `int8`-Basis erfolgen soll. Andernfalls (`None`) werden für Input- und Output-Werte der Datentyp float32 verwendet. 

In [12]:
quantization = True
quant_dtype = "int8 only"  # None

Es wird geprüft, ob die Quantisierung angewendet werden soll. Falls ja, werden die Optimierungen für den Konvertierungsprozess definiert:
- Mit `tf.lite.Optimize.DEFAULT` wird die standardmäßige Optimierung aktiviert.
- Wenn die Quantisierung auf int8-Basis durchgeführt werden soll (`int8 only` in `quant_dtype`), muss dem  Converter ein repräsentativer Datensatz zugewiesen werden. Dieser Datensatz wird benötigt, um den Wertebereich der Modellgewichte während der Quantisierung zu kalibrieren.
- Die unterstützten Operationen werden auf TFLite-Ops festgelegt, um sicherzustellen, dass nur int8-Operationen verwendet werden.
- Schließlich wird der Eingabe- und Ausgabetyp für Inferenzen auf `tf.uint8` festgelegt, um sicherzustellen, dass das Modell während der Inferenzen im int8-Format arbeitet.

In [13]:
# Prüfung, ob Quantisierung angewendet werden soll
if quantization:
    # Optimierungen werden auf Standardwerte gesetzt, um die Konvertierung zu optimieren
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    # Überprüfung, ob eine Quantisierung auf int8-Basis durchgeführt werden soll
    if quant_dtype == "int8 only":
        # Festlegen eines repräsentativen Datensatzes für die statische Quantisierung
        converter.representative_dataset = lambda: representative_dataset_gen(X_train)
        # Festlegen der unterstützten Operationen auf TensorFlow Lite eingebauten int8-Operationen
        converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
        # Festlegen des Eingabetyps für Inferenzen auf 8-Bit Ganzzahlen (uint8)
        converter.inference_input_type = tf.uint8
        # Festlegen des Ausgabetyps für Inferenzen auf 8-Bit Ganzzahlen (uint8)
        converter.inference_output_type = tf.uint8

Das TensorFlow-Modell wird in ein TensorFlow Lite-Modell konvertiert. Die Konvertierung beinhaltet alle zuvor festgelegten Optimierungen und Quantisierungseinstellungen, um die Modellgröße zu reduzieren und die Ausführungsgeschwindigkeit zu erhöhen.

In [14]:
tflite_model = converter.convert()



INFO:tensorflow:Assets written to: C:\Users\dk100\AppData\Local\Temp\tmpr1fu5m91\assets


INFO:tensorflow:Assets written to: C:\Users\dk100\AppData\Local\Temp\tmpr1fu5m91\assets


Speichern des konvertierten Modells als `.tflite-Datei`.

In [15]:
if quantization:
    if quant_dtype == "int8 only":
        open(os.path.join("assets", "models", "tflite", "mnist_model_quantized_int8.tflite"), "wb").write(tflite_model)
    else:
        open(os.path.join("assets", "models", "tflite", "mnist_model_quantized.tflite"), "wb").write(tflite_model)
else:
    open(os.path.join("assets", "models", "tflite", "mnist_model.tflite"), "wb").write(tflite_model)

### Modelkonvertierung zu C++-Modell

Der Dateipfad des zu ladenden TensorFlow Lite-Modells wird definiert.

In [16]:
if quantization:
    if quant_dtype == "int8 only":
        model_name = "mnist_model_quantized_int8"
    else:
        model_name = "mnist_model_quantized"
else:
    model_name = "mnist_model"
tflite_model_path = os.path.join("assets", "models", "tflite")
cpp_model_path = os.path.join("assets", "models", "c++")

Die TensorFlow Lite Modelldatei wird im Binärmodus geöffnet. Der Inhalt der Datei wird in eine Hexadezimaldarstellung konvertiert. Anschließend wird der Hex-String zurück in ein ByteArray umgewandelt. Diese Umwandlung erleichtert die Formatierung des Modells, insbesondere wenn es in eine C++-Datei eingebettet werden soll.

In [17]:
# Öffnen der TensorFlow Lite Modelldatei im Binärmodus
with open(os.path.join(tflite_model_path, f"{model_name}.tflite"), 'rb') as f:
    # Lesen des Inhalts der Datei und Konvertierung in eine Hexadezimaldarstellung
    content = f.read().hex()
    # Umwandlung des Hex-Strings zurück in ein ByteArray,
    # um es leichter in einer C++-Datei zu formatieren.
    result = bytearray.fromhex(content)

In der folgenden Zelle wird eine neue C++-Quelldatei erstellt, um das TensorFlow Lite Modell als Datenarray einzubetten. Zu Beginn wird der notwendige C++-Header und die Deklaration des Datenarrays in die C++-Datei geschrieben.

Die C++-Header umfassen Bedingungsprüfungen, die sicherstellen, dass das Datenarray auf verschiedenen Architekturen korrekt ausgerichtet ist. Anschließend wird das Modell in ein Datenarray geschrieben, wobei jedes Byte des Modells als Hexadezimalwert formatiert wird.

Eine Schleife iteriert über jedes Byte im Modell und formatiert die Werte in Zeilen, um die Lesbarkeit zu verbessern. Abschließend wird die Deklaration des Arrays geschlossen und die Länge des Arrays als Konstante deklariert. Diese Datei kann dann in C++-Projekten verwendet werden, um das Modell direkt in den Code einzubetten.

In [18]:
# Erstellen einer neuen C++-Quelldatei für das Modell, um die Daten einzubetten
with open(os.path.join(cpp_model_path, f"{model_name}_data.cpp"), "wb") as w:
    values_in_row = 0
    num_values = 0

    # Schreiben des Headers der C++-Datei
    w.write(bytearray('#include "' + model_name + '_data.h"\n'
                        "\n"
                        "// We need to keep the data array aligned on"
                        "some architectures.\n"
                        "#ifdef __has_attribute\n"
                        "#define HAVE_ATTRIBUTE(x) __has_attribute(x)\n"
                        "#else\n"
                        "#define HAVE_ATTRIBUTE(x) 0\n"
                        "#endif\n"
                        "#if HAVE_ATTRIBUTE(aligned) || (defined("
                        "__GNUC__) && !defined(__clang__))\n"
                        "#define DATA_ALIGN_ATTRIBUTE __attribute__("
                        "(aligned(4)))\n"
                        "#define DATA_ALIGN_ATTRIBUTE __attribute__("
                        "(aligned(4)))\n"
                        "#else\n"
                        "#define DATA_ALIGN_ATTRIBUTE\n"
                        "#endif\n"
                        "\n"

                        "const unsigned char " + model_name + "_tflite[]"
                        " DATA_ALIGN_ATTRIBUTE = {\n    ", 'utf-8'))
    # Iterieren über jedes Byte in den Modelldaten und Formatierung jedes
    # Bytes als Hexadezimalwert
    for value in result:
        num_values += 1
        values_in_row += 1
        value = "0x{:02x}".format(value)

        # Logik zur Formatierung der Werte im Array
        if values_in_row == 1:
            w.write(bytearray(value, 'utf-8'))
        elif values_in_row == 12:
            w.write(bytearray(", " + str(value) + ",\n    ", 'utf-8'))
            values_in_row = 0
        else:
            w.write(bytearray(', ' + str(value), 'utf-8'))
    # Abschluss der Arraydeklaration und der Datei
    w.write(bytearray("};\nconst int " + model_name + "_tflite_len = "
                        + str(num_values) + ";", 'utf-8'))


Erstellen einer Header-Datei, die das Datenarray und dessen Länge definiert. Diese Header-Datei ist notwendig, um das Datenarray und seine Länge in anderen C++-Quelldateien zugänglich zu machen.

In [19]:
with open(os.path.join(cpp_model_path, f"{model_name}_data.h"), "w") as f:
    f.write('#ifndef TENSORFLOW_LITE_MODEL_DATA_H_\n'
            '#define TENSORFLOW_LITE_MODEL_DATA_H_\n'
            '\n'
            'extern const unsigned char ' + model_name + '_tflite[];\n'
            'extern const int ' + model_name + '_tflite_len;\n'
            '\n'
            '#endif')

## Einsatz von TensorFlow Lite Modellen auf Raspberry Pi und anderen Einplatinencomputern

Falls das Modell mit `int8 only` quantisiert wurde, müssen die Testdaten in `uint8` umgewandelt werden. 

In [20]:
if quantization and "int8 only" in quant_dtype:
    X_test_lite = (X_test * 255).astype(np.uint8)
else:
    X_test_lite = X_test

Lesen der TensorFlow Lite Datei, in Abhängigkeit ob das Modell quantisiert wurde und falls ja welchen Datetyp die Eingabedaten haben müssen.

In [21]:
if quantization:
    if quant_dtype == "int8 only":
        with open(os.path.join("assets", "models", "tflite", "mnist_model_quantized_int8.tflite"), 'rb') as f:
            tflite_model = f.read()
    else:
        with open(os.path.join("assets", "models", "tflite", "mnist_model_quantized.tflite"), 'rb') as f:
            tflite_model = f.read()
else:
    with open(os.path.join("assets", "models", "tflite", "mnist_model_quantized.tflite"), 'rb') as f:
        tflite_model = f.read()

Hier wird ein `tf.lite.Interpreter` erstellt und mit dem Inhalt des TensorFlow Lite-Modells initialisiert, um die Eingabe- und Ausgabedetails des Modells abzurufen und Speicher für die Ausführung des Modells zuzuweisen.

In [22]:
# Laden des TensorFlow Lite Interpreters mit dem quantisierten Modell
interpreter = tf.lite.Interpreter(model_content=tflite_model)

# Allokieren von Speicher für den Interpreter, um das Modell auszuführen
interpreter.allocate_tensors()

# Abrufen der Details für die Eingabedaten des Modells
input_details = interpreter.get_input_details()

# Abrufen der Details für die Ausgabedaten des Modells
output_details = interpreter.get_output_details()

In dieser Zelle wird die Form der Testdaten überprüft, um sicherzustellen, dass sie mit den Eingabeanforderungen des Modells übereinstimmen.

In [23]:
if len(X_test_lite.shape) != 5:
    X_test_lite = np.expand_dims(X_test_lite, axis=1)

Für alle Testdaten werden Vorhersagen getroffen und in der Liste `predictions` gesammelt.<br>
Für jeden Datenpunkt werden folgende Schritte durchgeführt:
1. Übergabe der Eingabedaten an den Eingabetensor des Modells mit der Methode `set_tensor`.
2. Ausführung des Modells mit der Methode `invoke` aus.
3. Erhalten der Vorhersage des Modells aus dem Ausgabetensor mit der Methode `get_tensor`.
4. Vorhersage der Liste `predictions` hinzufügen.

In [24]:
predictions = []
for sample in X_test_lite:
    # Eingabedaten dem Eingabetensor des Modells übergeben
    interpreter.set_tensor(input_details[0]['index'], sample)
    # Ausführen des Modells
    interpreter.invoke()
    # Vorhersage des Modells aus dem Ausgabetensor erhalten
    pred = interpreter.get_tensor(output_details[0]['index'])
    # Aktuelle Vorhersage 'predictions' hinzufügen
    predictions.append(pred.argmax())

Letztlich wird die Genauigkeit des TensorFlow Lite-Modells anhand der Testdaten (`X_test_lite` und `y_test`) bestimmt. Hierfür werden einzelnen Elemente der Liste der Vorhersagen (`predictions`) und die tatsächlichen Labels (`y_test`) miteinander verglichen und überprüft, ob diese übereinstimmen.

In [25]:
model_acc = sum(1 for a,b in zip(predictions,y_test) if a == b) / len(predictions)
print(f"TensorFlow Lite Modellgenauigkeit: {model_acc}")

TensorFlow Lite Modellgenauigkeit: 0.9763
