<a href="https://colab.research.google.com/github/MarkusThill/techdays25/blob/feature-lab2-initial-draft/notebooks/lab2-model-quantization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 🚀 Lab 2: Effiziente Quantisierung tiefer neuronaler Netze
- Dieses Jupyter Notebook **benötigt eine GPU Laufzeit**. Falls nicht bereits voreingestellt, kann daher der Laufzeittyp im Menü unter "Laufzeit" > "Laufzeittyp ändern" > "Hardwarebeschleuniger" > **"T4 GPU"** geändert werden!

Strukturierung:
- Teil 1: Darstellung numerischer Datentypen
- Teil 2:
  - Quantisierung des einfachen Modells aus Lab 1
  - Diverse Betrachtungen auf dem quantisierten Modell (Genauigkeit, etc.)
  - Gotchas (Optional): Overflow/Underflow am Beispiel eines Average Pooling layers
  - Subnormal Numbers
  - ...
- Teil 3: Quantisierung eines DTMF Klassifikationsmodells
  - Illustration: Erzeugung einer DTMF Wählsequenz und Abspielen derselben
  - Laden eines vortrainierten DTMF-Klassifikationsmodells (ConvNet; Keras oder PyTorch)
  - Konvertierung nach ONNX
  - Quantisierung nach FP16
  - Messung der Inferenzzeiten (auch für verschiedene Batch-Sizes) und vergleich von FP32, FP16-Modell
  - Vergleich der Genauigkeit von FP16 und FP32 Modell (wie ändert sich die Fehlerrate)
  - Optional: Konvertierung nach FP8 und Wiederholung der obigen Schritte
  - Optional: Profiling der ONNX Modelle. Wo liegen die "Hotspots" des Modells?
  - Optional: Trainieren des Modells auf de


# Vorbereitungen: Installation der nötigen Abhängigkeiten

In [None]:
# Remove the `%%capture`, if you have the impression that something is going wrong during the setup
# %%capture
!pip install "techdays25[lab2] @ git+https://github.com/MarkusThill/techdays25.git@feature-lab2-initial-draft"

**WICHTIG: Nach der Installation der Abhängigkeiten (siehe oben) muss die Google Colab Laufzeit neugestartet werden! Im Anschluss kann mit der Ausführung der nächsten Zellen fortgefahren werden werden.**

In [None]:
# @title Colab-spezifische Konfigurationen {display-mode: "form"}
import sys

IN_COLAB = "google.colab" in sys.modules

if IN_COLAB:
    from google.colab import output

    output.enable_custom_widget_manager()

# 📘 Einleitung
- DTMF
- Quanstisierungsansätze
- etc.

# 📖 Teil 1: Darstellung numerischer Datentypen
- Zweierkomplementdarstellung
- IEEE-754 Standard

### Ganzahldarstellungen/Zweierkomplementdarstellung

In [None]:
# @title Darstellung von 8-Bit Integer Zahlen {display-mode: "form"}

import ipywidgets as widgets
from IPython.display import HTML

# Initialize 8 toggle buttons (bits, MSB to LSB)
bit_toggles = [
    widgets.ToggleButton(
        value=False, description="0", layout=widgets.Layout(width="40px")
    )
    for _ in range(8)
]

# Output widget to show results
output = widgets.Output()


def twos_complement(bits: list[int]) -> int:
    """Convert list of bits to signed integer using two's complement.

    Args:
        bits (list[int]): A list of bits representing the binary number.

    Returns:
        int: The signed integer value of the binary number.
    """
    if bits[0] == 0:
        return int("".join(str(b) for b in bits), 2)
    # If MSB is 1, it's negative
    inverted_bits = [1 - b for b in bits]  # Flip bits
    incremented = int("".join(str(b) for b in inverted_bits), 2) + 1
    return -incremented


def update_display(*args) -> None:
    """Update the display with the current binary, decimal, and hexadecimal values."""
    # Read bit values (MSB to LSB)
    bit_values = [int(btn.value) for btn in bit_toggles]
    bit_string = "".join(str(b) for b in bit_values)

    # Unsigned decimal value
    unsigned_decimal = int(bit_string, 2)

    # Signed decimal value (two's complement)
    signed_decimal = twos_complement(bit_values)

    # Hex representation (2 hex digits for 8 bits)
    hex_value = hex(unsigned_decimal).upper().replace("X", "x").replace("0X", "0x")

    # Clear previous output and update
    output.clear_output()
    with output:
        display(
            HTML(f"""
        <h3>
            Binary: <code>{bit_string}</code><br>
            Unsigned Decimal: <b>{unsigned_decimal}</b><br>
            Signed Decimal (Two's Complement): <b>{signed_decimal}</b><br>
            Hexadecimal: <b>{hex_value}</b>
        </h3>
        """)
        )

    # Update button labels (0/1)
    for btn, value in zip(bit_toggles, bit_values):
        btn.description = str(value)


# Attach observer to all buttons
for btn in bit_toggles:
    btn.observe(update_display, "value")

# Display widget
display(widgets.HBox(bit_toggles))
display(output)

# Initialize display
update_display()

#### Übungsfragen (Optional):
- Grundlegendes Setzen von Bits: Setze das 3. Bit (von rechts) einer 8-Bit-Zahl auf 1 und alle anderen Bits auf 0. Was ist die dezimale Darstellung dieser Zahl im unsigned Format?
Erwartete Antwort: 4

- Setzen mehrerer Bits: Setze das 1., 3. und 5. Bit (von rechts) einer 8-Bit-Zahl auf 1 und alle anderen Bits auf 0. Was ist die dezimale Darstellung dieser Zahl im unsigned Format?
Erwartete Antwort: 21

- Was ist die größtmögliche bzw. kleinstmögliche Zahl die mit 8 Bit dargestellt werden können? Antwort: -128, +127
  - Was wäre die Antwort, wenn wir statt 8-bit Integer, nun 32-bit Integer haben?

- Signed vs. Unsigned Darstellung: Setze das 8. Bit (höchstwertiges Bit) auf 1 und alle anderen Bits auf 0. Was sind die dezimalen Darstellungen dieser Zahl im signed und unsigned Format?
Erwartete Antwort: Signed: -128, Unsigned: 128

- Was charakterisiert eine negative Zahl in der Zweierkomplementdarstellung (unsigned integer) im Allgmeinen? Antwort: Zumindest das vorderste Bit ist gesetzt.

- Wie negiere ich eine Zahl (z.B. 32 -> -32 bzw. -71 -> 71)? Antwort: Invertieren alle Bits und Addition  von 1

- Angenommen ich habe -33 als 8-bit Zahl vorliegen. Wie würde ich daraus eine 32-bit unsigned Integer Zahl machen? Antwort: Einfach noch drei Bytes voranhängen in denen alle Bits gesetzt sind.

- Kombinieren von Bits: Setze das 2., 4. und 6. Bit (von rechts) einer 8-Bit-Zahl auf 1 und alle anderen Bits auf 0. Was sind die dezimalen Darstellungen dieser Zahl im signed und unsigned Format?
Erwartete Antwort: Signed: 42, Unsigned: 42

- Negative Zahlen in der Signed-Darstellung: Setze das 8. Bit (höchstwertiges Bit) und das 1. Bit (niederwertigstes Bit) auf 1 und alle anderen Bits auf 0. Was sind die dezimalen Darstellungen dieser Zahl im signed und unsigned Format?
Erwartete Antwort: Signed: -127, Unsigned: 129

- Maximale und minimale Werte: Was ist der maximale Wert, den man mit einer 8-Bit unsigned Zahl darstellen kann? Was ist der minimale Wert, den man mit einer 8-Bit signed Zahl darstellen kann?
Erwartete Antwort: Maximale unsigned: 255, Minimale signed: -128

- Bitmuster und Werte: Setze die Bits, um die Binärzahl 10101010 zu bilden. Was sind die dezimalen Darstellungen dieser Zahl im signed und unsigned Format?
Erwartete Antwort: Signed: -86, Unsigned: 170

- Alle Bits gesetzt: Setze alle Bits einer 8-Bit-Zahl auf 1. Was sind die dezimalen Darstellungen dieser Zahl im signed und unsigned Format?
Erwartete Antwort: Signed: -1, Unsigned: 255


- Verständnis von Überlauf: Was passiert, wenn du 1 zum maximalen Wert einer 8-Bit unsigned Zahl hinzufügst? Was passiert bei einer 8-Bit signed Zahl?
Erwartete Antwort: Bei unsigned wird es auf 0 zurückgesetzt. Bei signed verursacht es einen Überlauf und wird auf den minimalen Wert (-128) zurückgesetzt.
Zweierkomplement:

- Erkläre, wie die Zweierkomplement-Darstellung für negative Zahlen in einer 8-Bit signed Zahl funktioniert.
Erwartete Antwort: Im Zweierkomplement ist das höchstwertige Bit (MSB) das Vorzeichenbit. Um das Zweierkomplement einer Zahl zu finden, invertiere alle Bits und addiere 1 zum niederwertigsten Bit (LSB).

### Fixkommadarstellungen

In [None]:
# @title Darstellung von 16-Bit Festkomma-Zahlen (engl.: fixpoint binary representations) {display-mode: "form"}
import ipywidgets as widgets

# Initialize 16 toggle buttons (bits, MSB to LSB)
bit_toggles = [
    widgets.ToggleButton(
        value=False, description="0", layout=widgets.Layout(width="30px")
    )
    for _ in range(16)
]

# Color bars for sign, integer part, and fractional part
color_bars = [
    widgets.HTML(
        value='<div style="width: 30px; height: 10px; background-color: red;"></div>'
    )
    if i == 0
    else widgets.HTML(
        value='<div style="width: 30px; height: 10px; background-color: green;"></div>'
    )
    if 1 <= i <= 7
    else widgets.HTML(
        value='<div style="width: 30px; height: 10px; background-color: blue;"></div>'
    )
    for i in range(16)
]

# Output widget to show results
output = widgets.Output()


# def twos_complement(bits):
#    """Convert list of bits to signed integer using two's complement."""
#    if bits[0] == 0:
#        return int("".join(str(b) for b in bits), 2)
#
#    # If MSB is 1, it's negative
#    inverted_bits = [1 - b for b in bits]  # Flip bits
#    incremented = int("".join(str(b) for b in inverted_bits), 2) + 1
#    return -incremented


def fixed_point_value(bits: list[int]) -> float:
    """Convert a list of bits to a fixed-point value.

    Args:
        bits (list[int]): A list of 16 bits representing the binary number in fixed-point format.

    Returns:
        float: The fixed-point value of the binary number.
    """
    integer_part = bits[:8]
    fractional_part = bits[8:]

    # Calculate integer value
    integer_value = twos_complement(integer_part)

    # Calculate fractional value
    fractional_value = sum(
        bit * 2 ** (-i) for i, bit in enumerate(fractional_part, start=1)
    )

    return integer_value + fractional_value


def update_display(*args) -> None:
    """Update the display with the current binary, decimal, and hexadecimal values."""
    # Read bit values (MSB to LSB)
    bit_values = [int(btn.value) for btn in bit_toggles]
    bit_string = "".join(str(b) for b in bit_values)

    # Unsigned decimal value
    unsigned_decimal = int(bit_string, 2)

    # Signed decimal value (two's complement)
    signed_decimal = twos_complement(bit_values)

    # Fixed-point value
    fixed_point_decimal = fixed_point_value(bit_values)

    # Hex representation (4 hex digits for 16 bits)
    hex_value = hex(unsigned_decimal).upper().replace("X", "x").replace("0X", "0x")

    # Clear previous output and update
    output.clear_output()
    with output:
        display(
            HTML(f"""
        <h3>
            Binary: <code>
                <span style="color: red;">{bit_string[0]}</span>
                <span style="color: green;">{bit_string[1:8]}</span>.
                <span style="color: blue;">{bit_string[8:]}</span>
            </code><br>
            Unsigned Decimal: <b>{unsigned_decimal}</b><br>
            Signed Decimal (Two's Complement): <b>{signed_decimal}</b><br>
            Fixed-Point Decimal: <b>{fixed_point_decimal}</b><br>
            Hexadecimal: <b>{hex_value}</b>
        </h3>
        """)
        )

    # Update button labels (0/1)
    for btn, value in zip(bit_toggles, bit_values):
        btn.description = str(value)


# Attach observer to all buttons
for btn in bit_toggles:
    btn.observe(update_display, "value")

# Display widget
display(widgets.VBox([widgets.HBox(bit_toggles), widgets.HBox(color_bars)]))
display(output)

# Initialize display
update_display()

#### Übungsfragen (Optional):

### Fließkommadarstellungen nach IEEE-754
- TODO: Subnormal Numbers
- Webseite mit noch mehr Darstellungen: https://evanw.github.io/float-toy/
- Verschiedene FP8-Darstellungen: https://asawicki.info/articles/fp8_tables.php
- https://onnx.ai/onnx/technical/float8.html

In [None]:
## @title Darstellung von 16-Bit (FP16) Fließkomma-Zahlen {display-mode: "form"}

import struct

import ipywidgets as widgets
import numpy as np

# from IPython.display import display

# Initialize 16 toggle buttons (bits)
bit_toggles = [
    widgets.ToggleButton(
        value=False, description="0", layout=widgets.Layout(width="30px")
    )
    for _ in range(16)
]

# Color bars for sign, exponent, and mantissa
color_bars = [
    widgets.HTML(
        value='<div style="width: 30px; height: 10px; background-color: red;"></div>'
    )
    if i == 0
    else widgets.HTML(
        value='<div style="width: 30px; height: 10px; background-color: green;"></div>'
    )
    if 1 <= i <= 5
    else widgets.HTML(
        value='<div style="width: 30px; height: 10px; background-color: blue;"></div>'
    )
    for i in range(16)
]

# Output widget to show FP16 value and components
output = widgets.Output()


def bits_to_float16(bits: list[int]) -> np.float16:
    """Convert list of bits to FP16 float value.

    Args:
        bits (list[int]): A list of bits representing the binary number.

    Returns:
        np.float16: The FP16 float value of the binary number.
    """
    bit_string = "".join(str(b) for b in bits)
    # Convert binary string to integer
    int_value = int(bit_string, 2)
    # Pack as unsigned 16-bit int, then unpack as float16 using numpy
    packed = struct.pack("<H", int_value)  # Big endian 16-bit unsigned int
    return np.frombuffer(packed, dtype=np.float16)[0]


def update_display(*args):
    """Update the display with the current binary, FP16 float value, and its components."""
    # Read bit values (MSB to LSB)
    bit_values = [int(btn.value) for btn in bit_toggles]
    bit_string = "".join(str(b) for b in bit_values)

    # Extract components
    sign = bit_values[0]
    exponent_bits = bit_values[1:6]
    mantissa_bits = bit_values[6:]

    exponent = int("".join(str(b) for b in exponent_bits), 2)
    exponent_unbiased = exponent - 15  # Bias = 15

    mantissa_raw = "".join(str(b) for b in mantissa_bits)
    (
        1 + sum(int(b) * 2 ** (-i) for i, b in enumerate(mantissa_bits, start=1))
        if exponent != 0
        else 0
    )

    # Convert to float16 value
    fp16_value = bits_to_float16(bit_values)

    # Clear previous output and display new info
    output.clear_output()
    with output:
        display(
            HTML(f"""
        <h3>
            Binary: <code>
                <span style="color: red;">{bit_string[0]}</span>
                <span style="color: green;">{bit_string[1:6]}</span>
                <span style="color: blue;">{bit_string[6:]}</span>
            </code><br>
            Sign (1 bit): <b>{sign}</b> ({"-" if sign else "+"})<br>
            Exponent (5 bits): <b>{"".join(str(b) for b in exponent_bits)} (biased: {exponent}, unbiased: {exponent_unbiased})</b><br>
            Mantissa (10 bits): <b>{mantissa_raw}</b><br>
            <hr>
            <b>FP16 Value:</b> {fp16_value}
        </h3>
        """)
        )

    # Update button labels
    for btn, value in zip(bit_toggles, bit_values):
        btn.description = str(value)


# Attach observer to all buttons
for btn in bit_toggles:
    btn.observe(update_display, "value")

# Display widget
display(widgets.VBox([widgets.HBox(bit_toggles), widgets.HBox(color_bars)]))
display(output)

# Initialize output
update_display()  # 0 01111 0000000001 ^=^ 1.00097656

#### Übungsfragen (Optional):
- Gibt es einen Unterschied zwischen +0.0 und -0.0?
- Wie stelle ich `+Inf` bzw. `-Inf` dar?
- Wie stelle ich `NaN` dar?
- Was ergibt der Vergleich `float("nan") != float("nan")`?

# 🔢Teil 2: Quantisierung eines linearen Regressionsmodells (aus Lab 1)
- Gotchas: zu große, kleine inputs, numerische Abweichungen. Ggfs.: Sehr kleines Gewicht, sehr großer Input.
- Abweichungen bestimmen

# 🕸 Teil 3: Gotchas bei der Modellquantisierung am Beispiel eines einfachen Modells

In diesem Teil werden wir uns damit befassen, welche Probleme bei der Modellquantisierung auftreten können und dies anhand eines Beispiels illustrieren. Folgende Schritte werden durchgeführt:
- Erstellen eines benutzerdefinierten Modells in Keras zur Mittelwertbildung über entlang der Zeitachse.
- Konvertieren des Keras-Modells in das ONNX-Format.
- Quantisieren des ONNX-Modells von FP32 auf FP16.
- Generierung von 3-dimensionalen Zeitreihendaten für die Modellinferenz.
- Modellinferenz mit dem Keras-Modell.
- Modellinferenz mit den FP32/FP16 ONNX-Modellen:
  - Laden und Ausführen von ONNX-Modellen mit der ONNX Runtime.
  - Vergleich der Ausgaben von FP32- und FP16-ONNX-Modellen.
- Diskussion der Beobachtungen und möglicher Lösungen.


In [None]:
!pip install tf2onnx onnx onnxconverter-common onnxruntime-gpu # TODO: Add to Deps

## Modelldefinition

Unser Modell unten nimmt einen 3-dimensionalen Eingabetensor mit den Dimensionen (Batch-Größe, Sequenzlänge, Merkmalsanzahl) entgegen. In unserem Beispiel hat der Eingabetensor die Form (2, 10000, 3), was bedeutet, dass wir zwei Batch-Elemente haben, jedes mit einer Sequenzlänge von 10000 und 3 Merkmalen pro Zeitschritt.

Nach der Verarbeitung durch das Modell wird die Zeitdimension reduziert, und die Ausgabe hat die Form (Batch-Größe, Merkmalsanzahl). Für unser Beispiel ergibt sich eine Ausgabe mit der Form (2, 3). Die Ausgabe repräsentiert den Durchschnitt der Merkmale über die gesamte Sequenzlänge für jedes Batch-Element.

**Beispiel**

Eingabetensor (2 x 4 x 3):
```
[
    [3, 4, 5],
    [4, 5, 6],
    [7, 8, 9],
    [10, 11, 12]
  ],
  [
    [2, 4, 6],
    [8, 10, 12],
    [14, 16, 18],
    [20, 22, 24]
  ]
]
```

Ausgabetensor (2 x 3):
```
[
  [ 6,  7,  8],
  [11, 13, 15]
]
```


In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Input, Layer

# Custom Layer: Sum over time dimension
class SumLayer(Layer):
    def call(self, inputs):
        # Reduce (sum) the input tensor along the time axis (axis=1)
        return tf.reduce_sum(inputs, axis=1)

# Custom Layer: Division by sequence length
class DivisionLayer(Layer):
    def call(self, inputs):
        tensor_x, original_input = inputs  # Extract both tensors: the summed tensor and the original input tensor
        seq_length = tf.shape(original_input)[1]  # Get the dynamic sequence length (length of the time dimension)
        # Divide the summed tensor by the sequence length to get the average
        return tensor_x / tf.cast(seq_length, dtype=tensor_x.dtype)

# Define model with separate Sum and Division layers
def GlobalAveragePooling1D():
    # Define the input layer with shape (sequence_length=None, feature_dim=3)
    inputs = Input(shape=(None, 3), name="input")
    # Apply the SumLayer to the inputs
    sum_x = SumLayer(name="sum")(inputs)
    # Apply the DivisionLayer to the summed tensor and the original inputs
    output = DivisionLayer(name="divide")([sum_x, inputs])
    # Create the Keras model with the specified input and output
    model = keras.Model(inputs, output, name="GlobalAveragePooling1D")
    return model

# Create the model
model = GlobalAveragePooling1D()

# Print model summary to see the architecture
model.summary()

## Modellkonvertierung nach ONNX

In [None]:
# Convert our GlobalAveragePooling1D Model to ONNX format
import tf2onnx  # TensorFlow to ONNX conversion library
import onnx  # ONNX library for handling ONNX models
import tensorflow as tf  # TensorFlow library
from onnxconverter_common import float16  # Utility for FP16 conversion

# Define the path where the FP32 ONNX model will be saved
onnx_model_path_fp32 = "gap1d_model_fp32.onnx"

# Convert the Keras model to ONNX format with FP32 precision
# Define the input specification for the model conversion
spec = (tf.TensorSpec((None, None, 3), tf.float32, name="input"),)
# Convert the Keras model to ONNX using tf2onnx
onnx_model, _ = tf2onnx.convert.from_keras(model, input_signature=spec, opset=18)

# Save the converted FP32 ONNX model to the specified path
onnx.save(onnx_model, onnx_model_path_fp32)
print(f"ONNX model (FP32) saved to {onnx_model_path_fp32}")

## Quantisierung des ONNX Modells

In [None]:
# Now quantize the ONNX model to FP16 and save it

# Load the previously saved FP32 ONNX model
onnx_model_fp32 = onnx.load(onnx_model_path_fp32)

# Convert the FP32 ONNX model to FP16 precision
# The keep_io_types=True argument ensures that the input and output types remain the same
onnx_model_fp16 = float16.convert_float_to_float16(onnx_model_fp32, keep_io_types=True)

# Define the path where the FP16 ONNX model will be saved
onnx_model_path_fp16 = "gap1d_model_fp16.onnx"

# Save the converted FP16 ONNX model to the specified path
onnx.save(onnx_model_fp16, onnx_model_path_fp16)

# Print a message indicating that the FP16 ONNX model has been saved successfully
print(f"ONNX model (FP16) saved to {onnx_model_path_fp16}")

In [None]:
from techdays25 import onnx_utils

onnx_utils.netron_visualize("gap1d_model_fp16.onnx")

## Datengenerierung

In [None]:
# First create some data and put it through the Keras model
import numpy as np  # Library for numerical operations
import matplotlib.pyplot as plt  # Library for plotting

# Create a random input tensor with a batch size of 2
# Generate a sequence of numbers from 0 to 9999 and reshape it to (1, 10000, 1)
tt = np.arange(10_000).reshape(1, -1, 1)

# Create an offset array and reverse it
off = (np.array(np.arange(6)).astype(np.float32) + 4)[::-1]

# Generate a 3-dimensional time series data using a sine function with the offset
xx = 0.4 * np.sin(4 * np.pi * 1e-5 * off**2 * tt) + off

# Reshape the data to have dimensions (sequence_length, batch_size, feature_dim)
xx = xx.reshape(-1, 2, 3)

# Swap the axes to get the shape (batch_size, sequence_length, feature_dim)
xx = np.swapaxes(xx, 0, 1)

# Convert the data to float32 type
x_input = xx.astype(np.float32)

# Print the dimensions of the input tensor
print("Dimensions of the input tensor:", x_input.shape)

In [None]:
# Plot the generated time series data
import matplotlib.pyplot as plt  # Library for plotting (re-imported for completeness)

# Create a new figure with a specified size
plt.figure(figsize=(10, 6))

# Plot the first batch (b$_1$) of the time series data
plt.plot(xx[0, :, :], label="b$_1$")

# Plot the second batch (b$_2$) of the time series data
plt.plot(xx[1, :, :], label="b$_2$")

# Set the label for the x-axis
plt.xlabel("Index")

# Set the label for the y-axis
plt.ylabel("Amplitude")

# Set the title of the plot
plt.title("Ein Batch bestehend aus jeweils zwei 3-dimensionalen Zeitreihen")

# Add a legend to the plot
plt.legend(loc='upper center', bbox_to_anchor=(1.1, 0.6), ncol=2)

# Display the plot
plt.show()

## Inferenz mit dem Keras Modell

In [None]:
# Put the data through the Keras model

# Use the Keras model to make predictions on the input data
y_output_keras = model.predict(x_input)

# Print the dimensions of the input tensor
print("Dimensionen des Eingabetensors:", x_input.shape)  # Erwartete Form: (2, 10000, 3)

# Print the dimensions of the Keras model output
print("Dimensionen der Keras-Modellausgabe:", y_output_keras.shape)  # Erwartete Form: (2, 3) -> Zeitdimension reduziert!

# Print the output of the Keras model
print("Keras-Modellausgabe:\n", y_output_keras)

## Inferenz mit den FP32/FP16 ONNX Modellen

In [None]:
# Run the ONNX models
import onnxruntime as ort  # Bibliothek zum Ausführen von ONNX-Modellen
import numpy as np  # Bibliothek für numerische Operationen (nochmals importiert für Vollständigkeit)

# Funktion zum Laden und Ausführen eines ONNX-Modells mit CUDA Execution Provider
def run_onnx_model(onnx_model_path, x_input):
    # Erstellen einer Inferenzsitzung für das ONNX-Modell mit dem CUDA Execution Provider
    session = ort.InferenceSession(
        onnx_model_path,
        providers=["CUDAExecutionProvider"]
    )

    # Abrufen des Namens, der Form und des Datentyps des Modelleingangs
    input_name = session.get_inputs()[0].name
    input_shape = session.get_inputs()[0].shape
    input_dtype = session.get_inputs()[0].type

    # Ausgabe der Modelleingabedetails
    print(f"Name des Modellinputs: {input_name}")
    print(f"Dimension des Modellinputs: {input_shape}")
    print(f"Typ des Modellinputs: {input_dtype}")

    # Inferenz durchführen: Die gleichen Daten durch das ONNX-Modell laufen lassen
    outputs = session.run(None, {input_name: x_input})

    # Ausgabe der Ausgabedetails
    print(f"Dimension der Ausgabe: {outputs[0].shape}")
    print(f"\nAusgabe:\n{outputs[0]}")

In [None]:
# Run the FP32 ONNX model
run_onnx_model(onnx_model_path = "gap1d_model_fp32.onnx", x_input=x_input)

In [None]:
# Run the FP16 ONNX model
run_onnx_model(onnx_model_path = "gap1d_model_fp16.onnx", x_input=x_input)

## Fragen / Diskussion
- Welche Ergebnisse erwarten wir? Stimmen die Ergebnisse mit den Erwartungen überein?
- Was fällt bei der Ausgabe des quantisierten FP16 Modells auf?
  - Wie könnte man sich dieses Ergebnis erklären?
  - Lässt sich das Problem ggfs. vermeiden?


# 📞 Teil 4: Quantisierung eines DTMF Klassifikationsmodells

## Einführung: Generierung und Dekodierung/Klassifizierung von DTMF (dual-tone multi-frequency) Signalen <a class="anchor" id="part0"></a>


Das Dualton-Mehrfrequenzwahlverfahren (DTMF) ist ein Signalisierungssystem für das Wählen eines Telefons, das in den frühen 1960er Jahren von Western Electric entwickelt und später von Bell System kommerziell an Telefonkunden geliefert wurde.
Wenn eine Taste auf dem Telefon gedrückt wird, werden zwei harmonische Tonsignale erzeugt, und die Superposition/Überlagerung beider Signale wird verwendet, um die entsprechende Telefontaste zu charakterisieren. Wenn zum Beispiel die Taste „5“ gedrückt wird, entsteht ein Dualtontonsignal, das sich aus den Frequenzen 770 Hz und 1336 Hz zusammensetzt. Die beiden Frequenzen, die jede Taste beschreiben, sind in der folgenden Tabelle aufgeführt:

|   | 1209Hz  | 1336 Hz  | 1477 Hz   | 1633 Hz  |
|---|:---:|:---:|:---:|:---:|
| **697 Hz**  |  1 | 2  | 3  | A  |
| **770 Hz**  |  4 | 5  | 6  | B  |
| **852 Hz**  |  7 | 8  | 9  | C  |
| **941 Hz**  |  * | 0  | #  | D  |

In diesem Beispiel werden wir uns ansehen, wie man solche DTMF-Wählsequenzen generiert, sie in einer Audiodatei speichert und das Audiosignal mit einem einfachen KI-Modell wieder dekodiert.

Wir werden die folgenden Schritte durchführen, um ein DTMF-Signal zu erzeugen und mit einem Klassifikationsmodell zu dekodieren:
1. Erzeugung des Signals und der Audiodatei mit `scipy` und `numpy`. Wir speichern die erzeugte Audiodatei in einer `.wav` Datei, die in diesem Notebook oder in deinem lokalen Audioplayer abgespielt werden kann
2. Wir entwerfen eine einfaches KI-Modell ... TODO
3. Extraktion der gewählten Tastenfolge aus der `.wav`-Datei unter Verwendung des KI-Modells
4. Quantisierung & Export des Modells nach ONNX im FP32 und FP16 Format. Quantisierung nach INT8 und Export nach TensorRT.
5. Laufzeituntersuchungen für FP32/FP16/INT8, unterschiedliche Batch-Größen und Signallängen.

In [None]:
# Trainiere Modell
# Konvertiere Modell nach ONNX (einmal FP32, einmal FP16)
# Untersuche Laufzeitunterschiede (auch nach batchsize)
# Untersuche Abweichungen. Wie ändert sich die Fehlerrate des Modells für FP32/Fp16?

## Signal- und Audiodatei-Generierung <a class="anchor" id="part1"></a>

In [None]:
import time

# from collections.abc import Callable
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from scipy.io import wavfile

In [None]:
## @title Tastenfeld-Widget für Generierung der Wählsequenz' {display-mode: "form"}

import ipywidgets as widgets

# from IPython.display import display

# Initialize a text widget to display the dial sequence
dial_sequence = widgets.Text(
    value="",
    placeholder="Dial sequence will appear here...",
    description="",
    disabled=True,
    layout=widgets.Layout(width="300px"),
)


# Function to handle button clicks
def on_button_click(b):
    """_summary_.

    Args:
        b (_type_): _description_
    """
    dial_sequence.value += b.description


# Create buttons for the phone dialer
buttons = []
for row in [["1", "2", "3", "A"], ["4", "5", "6", "B"], ["7", "8", "9", "C"], ["*", "0", "#", "D"]]:
    button_row = []
    for label in row:
        button = widgets.Button(
            description=label, layout=widgets.Layout(width="50px", height="50px")
        )
        button.on_click(on_button_click)
        button_row.append(button)
    buttons.append(widgets.HBox(button_row))

# Create a clear button
clear_button = widgets.Button(
    description="Clear", layout=widgets.Layout(width="158px", height="50px")
)

back_button = widgets.Button(
    description="⬅", layout=widgets.Layout(width="50px", height="50px")
)


def on_clear_click(b):
    """_summary_.

    Args:
        b (_type_): _description_
    """
    global dial_sequence
    dial_sequence.value = ""

def on_back_click(b):
    """_summary_.

    Args:
        b (_type_): _description_
    """
    global dial_sequence
    dial_sequence.value = dial_sequence.value[:-1]


clear_button.on_click(on_clear_click)


back_button.on_click(on_back_click)

# Display the dialer
display(dial_sequence)
for button_row in buttons:
    display(button_row)
display(widgets.HBox([clear_button, back_button]))

In [None]:
print("Gewählte Sequenz:", dial_sequence.value)

### Generierung des Wähl-Audiosignals

In [None]:
from techdays25.dtmf_generation import DtmfGenerator

dtmf_gen = DtmfGenerator(
    dur_key=(0.2, 0.3),
    dur_pause=(0.01, 0.1),
    noise_factor=(10.0, 50.0),
    noise_freq_range=(0.0, 20000.0),
)

In [None]:
# Either use the dialed sequence from above:
my_dialed_sequence_keys = dial_sequence.value

# ... or generate a random sequence:
# my_dialed_sequence_keys = "".join([random.choice("1234567890ABCD*#") for i in range(10)])

# ... or use a simple sequence for debugging purposes
# my_dialed_sequence_keys = "1234567890ABCD*#" # for debug purposes...

# ... or use a slightly longer sequence (which also contains all symbols)
# my_dialed_sequence_keys = "91D282A0B8C16C*C9#504979D#443B"
if not my_dialed_sequence_keys:
    my_dialed_sequence_keys = "91D282A0B8C16C*C9#504979D#443B"

# Try changing the following arguments: dur_key=0.05, dur_pause=0.02
my_dialed_sequence_signal = dtmf_gen.get_tone_sequence(my_dialed_sequence_keys)

### Visualisierung des Signals

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(my_dialed_sequence_signal)
plt.xlabel("Index")
plt.ylabel("Amplitude")
plt.title("Das vollständige gewählte Signal")

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(my_dialed_sequence_signal[: 10**4])
plt.xlabel("Index")
plt.ylabel("Amplitude")
plt.title("Die ersten 10000 Datenpunkte des gewählten Signals")

In [None]:
plt.figure(figsize=(10, 5))
quant = np.quantile(my_dialed_sequence_signal, 0.99)
start_index = np.where(my_dialed_sequence_signal > quant)[0][10]
plt.plot(my_dialed_sequence_signal)
plt.xlabel("Index")
plt.ylabel("Amplitude")
plt.title("Weiterer Zoom-In")
plt.xlim(start_index, start_index+1.5*10**3)

In [None]:
# Now let us listen to the generated WAV file
import IPython
import numpy as np

wav_file_name = "my_dtmf_file.wav"

wavfile.write(
    wav_file_name,
    dtmf_gen.get_sample_rate(),
    (my_dialed_sequence_signal * np.iinfo(np.int32).max).astype(np.int32),
)
IPython.display.Audio(wav_file_name)

In [None]:
print("Dialed sequence: ", my_dialed_sequence_keys)
print("Used symbols: ", len(set(my_dialed_sequence_keys)))
print("Total length of signal:", my_dialed_sequence_signal.shape[0])

### Signal-Spektrogramm

In [None]:
plt.figure(figsize=(10, 5))
Pxx, freqs, bins, im = plt.specgram(
    my_dialed_sequence_signal, NFFT=1024, Fs=dtmf_gen.get_sample_rate()
)
plt.ylim(0, 2000)
plt.xlabel("t [s]")
plt.ylabel("f [Hz]")
plt.title("Spektrogramm des generierten Telefonwählsignals")
plt.show(im)

## Experimente

### Laden des vortrainierten Keras Modells

In [None]:
import tensorflow as tf
keras_model = tf.keras.models.load_model("dtmf_classifier.keras")

In [None]:
import matplotlib.pyplot as plt

start = time.time()
keras_pred = keras_model.predict(my_dialed_sequence_signal.reshape(1, -1, 1))
end = time.time()

cmap = plt.get_cmap("tab20")

colors = [cmap(i) for i in range(16)]  # Get 16 distinct colors

plt.figure(figsize=(10, 5))
plt.plot(my_dialed_sequence_signal)

for key_idx in range(keras_pred.shape[-1] - 1):  # last index represents pauses
    plt.plot(
        keras_pred[0, :, key_idx],
        label=f"{dtmf_gen.get_key(key_idx=key_idx)}",
        color=colors[key_idx],
    )
plt.legend(loc="upper center", bbox_to_anchor=(0.5, -0.15), ncol=8)
plt.title(f"Tatsächliche Wahlsequenz: {' '.join(list(my_dialed_sequence_keys))}")
plt.show()
print("Inferenz-Dauer:", end - start)

In [None]:
predicted_key_sequence = dtmf_gen.decode_prediction(keras_pred)
print("Prognostizierte Wählsequenz:", predicted_key_sequence)
print(
    "Passt die Prognose zur tatsächlichen gewählten Sequenz?:",
    "Ja!" if predicted_key_sequence == my_dialed_sequence_keys else "Nein!",
)

### Konvertierung des Keras Modells nach ONNX

In [None]:
import onnx
import tf2onnx

# Diese Zelle könnte einen Fehler werfen.
# Dennoch sollte das ONNX Modell korrekt exportiert werden
keras_model.output_names = ["output"]

input_signature = [
    tf.TensorSpec([None, 2**12, 1], tf.float32, name="input")
]  # TODO: time axis fixed or dynamic?
# Use from_function for tf functions
onnx_model, _ = tf2onnx.convert.from_keras(keras_model, input_signature, opset=18)
onnx.save(onnx_model, "dtmf_classifier.onnx")

In [None]:
!pip install onnx-simplifier # TODO: Add to deps

### Optimierung des ONNX Modells

In [None]:
import onnxsim

model_path = "dtmf_classifier.onnx"
simplified_model_path = "dtmf_classifier.onnx"
onnx_model = onnx.load(model_path)
onnx.checker.check_model(onnx_model)
onnx_model_simp, check = onnxsim.simplify(onnx_model)
assert check, "Simplified ONNX model could not be validated"
onnx.save(onnx_model_simp, simplified_model_path)

In [None]:
from techdays25 import onnx_utils

onnx_utils.netron_visualize("dtmf_classifier.onnx")

### Quantisierung des ONNX Modells

In [None]:
import onnx
from onnxconverter_common import float16

onnx_model = onnx.load("dtmf_classifier.onnx")
onnx.checker.check_model(onnx_model)
onnx_model_fp16 = float16.convert_float_to_float16(
    onnx_model,
    min_positive_val=1e-7,
    max_finite_val=1e4,
    keep_io_types=True,
    disable_shape_infer=False,
    op_block_list=None,
    node_block_list=None,
)
onnx.save(onnx_model_fp16, "dtmf_classifier_fp16.onnx")

In [None]:
from techdays25.dtmf_models import DtmfClassifierOnnx

# Import tensorrt_libs
import tensorrt_libs

# FP32 ONNX Model
onnx_classifier = DtmfClassifierOnnx("dtmf_classifier.onnx")

# FP16 ONNX Model
onnx_classifier_fp16 = DtmfClassifierOnnx("dtmf_classifier_fp16.onnx")

# FP16 TensorRT Model
trt_provider = (
        "TensorrtExecutionProvider",
        {
            "device_id": 0,  # The device ID
            'trt_max_workspace_size': 4e9, # Maximum workspace size for TensorRT engine (1e9 ≈ 1GB)
            "trt_fp16_enable": True,
            "trt_int8_enable": False,
            "trt_int8_use_native_calibration_table": False,
            "trt_engine_cache_enable": False,
            "trt_engine_cache_path": "./trt_catch_dir",
            #"trt_profile_opt_shapes": "input:32x4096x1",
            #"trt_profile_min_shapes": "input:1x4096x1",
            #"trt_profile_max_shapes": "input:32x4096x1",
        },
    )
#tensorrt_classifier_fp16 = DtmfClassifierOnnx("dtmf_classifier.onnx", provider=trt_provider)

#tensorrt_classifier_fp32 = TensorRTInfer("dtmf_classifier_fp32_nvidia_l4.trt")
#tensorrt_classifier_fp16 = TensorRTInfer("dtmf_classifier_fp16_nvidia_l4.trt")
tensorrt_classifier_int8 = TensorRTInfer("dtmf_classifier_int8_nvidia_a100-sxm4-40gb.trt")


In [None]:
# TODO: Allow to select model here:
onnx_prediction = onnx_classifier.predict(
    my_dialed_sequence_signal.reshape(1, -1, 1).astype(np.float32)
)
predicted_key_sequence = dtmf_gen.decode_prediction(onnx_prediction)
print("Predicted Sequence:", predicted_key_sequence)
print(
    "Passt die Prognose zur tatsächlichen gewählten Sequenz?:",
    "Ja!" if predicted_key_sequence == my_dialed_sequence_keys else "Nein!",
)

In [None]:
# TODO: Validate ONNX models and Keras Model on Validation/Test data

### Peformanzmessung (Latenz) der individuellen Modelle

In [None]:
from techdays25.dtmf_models import measure_latency
import pandas as pd

n_runs = 100
n_warmup = 20
signal_length = 2**12
batch_sizes = [2**i for i in range(11)]
# model_dict = {"keras": lambda x: sum(np.median(x) for i in range(10)),
#         "onnx": lambda x: sum(np.median(x) for i in range(5))}
model_dict = {
    #"keras": lambda x: model.predict(x, verbose=0),
    "ONNX (FP32)": onnx_classifier.predict,
    "ONNX (FP16)": onnx_classifier_fp16.predict,
    #"TRT (FP32)": tensorrt_classifier_fp32.infer,
    #"TRT (FP16)": tensorrt_classifier_fp16.infer,
    "TRT (INT8)": tensorrt_classifier_int8.infer,
}

results = {}
for model_name, f_predict in model_dict.items():
    print(model_name)
    timings = {}
    for batch_size in batch_sizes:
        tensor_shape = (batch_size, signal_length, 1)
        batch_timings = measure_latency(
            f_predict, tensor_shape=tensor_shape, n_runs=n_runs, n_warmup=n_warmup
        )
        timings[batch_size] = batch_timings

    df = pd.DataFrame(timings)
    results[model_name] = df

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

cmap = plt.get_cmap("tab10")
colors = [cmap(i) for i in range(len(results))]

plt.figure(figsize=(8, 5))
for (model_name, df), color in zip(results.items(), colors):
    # Convert columns to integers for sorting
    df.columns = df.columns.astype(int)
    df = df[sorted(df.columns)]

    # Compute medians and standard deviations
    medians = df.median()
    stds = df.std()

    # X-axis values (sorted batch sizes)
    x = medians.index

    # Plot with connected points and error bars

    plt.errorbar(
        x,
        medians,
        yerr=stds,
        fmt="-o",
        capsize=5,
        color=color,
        ecolor=color,
        elinewidth=2,
        markerfacecolor="white",
        label=model_name,
    )

# Make it pretty
plt.xlabel("Batch-Größe")
plt.ylabel("Latenz [s]")
plt.title("TODO")
plt.grid(True, linestyle="--", alpha=0.6, which="both")
# plt.xticks(x)
plt.tight_layout()
plt.legend()
plt.xscale("log")
#plt.yscale("log")
plt.show()

In [None]:
# import onnx
# Load the ONNX model
# model_path = "dtmf_classifier.onnx"
# model = onnx.load(model_path)

In [None]:
from onnxruntime.quantization.shape_inference import quant_pre_process
quant_pre_process("dtmf_classifier.onnx", "dtmf_classifier.onnx")

In [None]:
from onnxruntime.quantization import (
    quantize_dynamic,
    QuantType,
    CalibrationDataReader,
    quantize_static,
    QuantFormat,
    quantize_dynamic,
)


class CalibrationDataReaderImpl(CalibrationDataReader):
    def __init__(self):
        # self.data = calibration_data
        # self.data_iter = iter(self.data)
        self.counter = 0

    def get_next(self):
        if self.counter >= 10:
            return None
        self.counter += 1
        X, _ = dtmf_gen.generate_dataset(n_samples=64, t_length=2**12)
        return {"input": X.astype(np.float32)}


# Prepare calibration data
calibration_data = None
calibration_data_reader = CalibrationDataReaderImpl()

# Quantize the model
quantized_model_path = "dtmf_classifier_int8.onnx"

quantize_static(
    # quantize_dynamic(
    "dtmf_classifier_pre.onnx",
    quantized_model_path,
    calibration_data_reader,
    # quant_format=QuantFormat.QOperator
    # quant_format=QuantType.QInt8,
    # per_channel=True,
    # weight_type=QuantType.QInt8,
    # optimize_model=True,
)

In [None]:
import onnxruntime as ort

# Load the quantized model
quantized_model = ort.InferenceSession(quantized_model_path)

# Run inference with the quantized model
X, Y = dtmf_gen.generate_dataset(n_samples=64, t_length=2**10)
input_name = quantized_model.get_inputs()[0].name
output_name = quantized_model.get_outputs()[0].name

result = quantized_model.run([output_name], {input_name: X.astype(np.float32)})[0]

In [None]:
thresholded = (result > 0.5).astype(int)
(thresholded == Y).sum() / Y.size

## TensorRT Tests

In [None]:
def get_gpu_type():
  import torch
  if not torch.cuda.is_available():
    return "cpu"
  return "_".join(torch.cuda.get_device_name(0).lower().split(" "))

In [None]:
from techdays25.dtmf_generation import DtmfGenerator

dtmf_gen = DtmfGenerator(
    dur_key=(0.2, 0.3),
    dur_pause=(0.01, 0.1),
    noise_factor=(10.0, 50.0),
    noise_freq_range=(0.0, 20000.0),
)

In [None]:
import tensorrt as trt
import os
import sys
import time
import argparse
import numpy as np
from cuda import cudart
from cuda import cuda, cudart

def check_cuda_err(err):
    if isinstance(err, cuda.CUresult):
        if err != cuda.CUresult.CUDA_SUCCESS:
            raise RuntimeError("Cuda Error: {}".format(err))
    if isinstance(err, cudart.cudaError_t):
        if err != cudart.cudaError_t.cudaSuccess:
            raise RuntimeError("Cuda Runtime Error: {}".format(err))
    else:
        raise RuntimeError("Unknown error type: {}".format(err))

def cuda_call(call):
    err, res = call[0], call[1:]
    check_cuda_err(err)
    if len(res) == 1:
        res = res[0]
    return res

# Wrapper for cudaMemcpy which infers copy size and does error checking
def memcpy_host_to_device(device_ptr: int, host_arr: np.ndarray):
    nbytes = host_arr.size * host_arr.itemsize
    cuda_call(cudart.cudaMemcpy(device_ptr, host_arr, nbytes, cudart.cudaMemcpyKind.cudaMemcpyHostToDevice))

# Wrapper for cudaMemcpy which infers copy size and does error checking
def memcpy_device_to_host(host_arr: np.ndarray, device_ptr: int):
    nbytes = host_arr.size * host_arr.itemsize
    cuda_call(cudart.cudaMemcpy(host_arr, device_ptr, nbytes, cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost))


class MNISTEntropyCalibrator(trt.IInt8EntropyCalibrator2):
    def __init__(self, training_data, cache_file, batch_size=16):
        # Whenever you specify a custom constructor for a TensorRT class,
        # you MUST call the constructor of the parent explicitly.
        trt.IInt8EntropyCalibrator2.__init__(self)

        self.cache_file = cache_file

        # Every time get_batch is called, the next batch of size batch_size will be copied to the device and returned.
        #self.data = 2 * np.random.rand(32*batch_size, 2**12, 1).astype(np.float32) - 1.0
        # self.data = training_data
        self.data = dtmf_gen.generate_dataset(n_samples=32*batch_size, t_length=2**12, with_labels=None).astype(np.float32)
        #print(self.data.dtype)
        #self.data = self.data.astype(np.float32)
        self.batch_size = batch_size
        self.current_index = 0

        # Allocate enough memory for a whole batch.
        #self.device_input = cuda.mem_alloc(self.data[0].nbytes * self.batch_size)
        n_bytes = self.data[0].nbytes * self.batch_size
        # print("n_bytes", n_bytes)
        self.device_input = cuda_call(cudart.cudaMalloc(n_bytes))

    def get_batch_size(self):
        return self.batch_size

    # TensorRT passes along the names of the engine bindings to the get_batch function.
    # You don't necessarily have to use them, but they can be useful to understand the order of
    # the inputs. The bindings list is expected to have the same ordering as 'names'.
    def get_batch(self, names):
        # print("names:", names)
        if self.current_index + self.batch_size > self.data.shape[0]:
            return None

        current_batch = int(self.current_index / self.batch_size)
        if current_batch % 10 == 0:
            print("Calibrating batch {:}, containing {:} images".format(current_batch, self.batch_size))

        batch = self.data[self.current_index:self.current_index + self.batch_size].ravel()
        #cuda.memcpy_htod(self.device_input, batch)
        #memcpy_host_to_device(self.device_input, batch)
        memcpy_host_to_device(self.device_input, np.ascontiguousarray(batch)
            )
        self.current_index += self.batch_size
        # print("Schalom!")
        return [int(self.device_input)]


    def read_calibration_cache(self):
        # If there is a cache, use it instead of calibrating again. Otherwise, implicitly return None.
        if os.path.exists(self.cache_file):
            with open(self.cache_file, "rb") as f:
                return f.read()

    def write_calibration_cache(self, cache):
        return None # for now
        with open(self.cache_file, "wb") as f:
            f.write(cache)

In [None]:
# You can set the logger severity higher to suppress messages (or lower to display more messages).
TRT_LOGGER = trt.Logger(trt.Logger.VERBOSE)


# The Onnx path is used for Onnx models.
def build_engine_onnx(model_file, trt_engine_path, precision):
    seq_len = 2**12
    max_batch_size = [1,2,4,8,16,32,64,128,256,512, 1024]
    calibration_batch_size = 16
    builder = trt.Builder(TRT_LOGGER)
    network = builder.create_network(0)
    config = builder.create_builder_config()
    parser = trt.OnnxParser(network, TRT_LOGGER)

    config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, 8*1<<30) # TODO: Constant
    # Load the Onnx model and parse it in order to populate the TensorRT network.
    with open(model_file, "rb") as model:
        if not parser.parse(model.read()):
            print("ERROR: Failed to parse the ONNX file.")
            for error in range(parser.num_errors):
                print(parser.get_error(error))
            return None

    for b in max_batch_size:
      profile = builder.create_optimization_profile()
      profile.set_shape("input", [1,seq_len,1], [b,seq_len,1], [b,seq_len,1])
      config.add_optimization_profile(profile)




    if precision in ["fp16", "int8", "mixed"]:
        if not builder.platform_has_fast_fp16:
            print("FP16 is not supported natively on this platform/device")
        config.set_flag(trt.BuilderFlag.FP16)
    if precision in ["int8", "mixed"]:
        if not builder.platform_has_fast_int8:
            print("INT8 is not supported natively on this platform/device")
        config.set_flag(trt.BuilderFlag.INT8)
        # config.set_flag(trt.BuilderFlag.OBEY_PRECISION_CONSTRAINTS)

        calib = MNISTEntropyCalibrator("", cache_file="cache.file", batch_size=calibration_batch_size)
        config.int8_calibrator = calib

        calib_profile = builder.create_optimization_profile()
        calib_profile.set_shape("input", [calibration_batch_size,seq_len,1], [calibration_batch_size,seq_len,1], [calibration_batch_size,seq_len,1])
        config.set_calibration_profile(calib_profile)
        config.profiling_verbosity = trt.ProfilingVerbosity.DETAILED

        print("int 8 model")

    engine_bytes = builder.build_serialized_network(network, config)

    if engine_bytes is None:
        print("Failed to create the TensorRT engine")
        return None
    runtime = trt.Runtime(TRT_LOGGER)

    # Save the engine to a file
    with open(trt_engine_path, "wb") as f:
        f.write(engine_bytes)

    print(f"TensorRT engine saved to {trt_engine_path}")

    #return runtime.deserialize_cuda_engine(engine_bytes)

# Example Usage
precision = "int8"
onnx_path = "dtmf_classifier.onnx"
trt_path = "dtmf_classifier_"+ precision + "_" + get_gpu_type() + ".trt"
build_engine_onnx(onnx_path, trt_path, precision=precision)


In [None]:
DEBUG = False
def print_dbg(*x):
  if DEBUG:
    print(x)

class TensorRTInfer:
  # TODO: This code still has a memory leak. The memory allocated by
  # cudaMalloc has to be released!
    """
    Implements inference for the TensorRT engine.
    """

    def __init__(self, engine_path):
        """
        :param engine_path: The path to the serialized engine to load from disk.
        """

        # Load TRT engine
        self.logger = trt.Logger(trt.Logger.ERROR)
        trt.init_libnvinfer_plugins(self.logger, namespace="")
        with open(engine_path, "rb") as f, trt.Runtime(self.logger) as runtime:
            assert runtime
            self.engine = runtime.deserialize_cuda_engine(f.read())
        assert self.engine
        self.context = self.engine.create_execution_context()
        assert self.context

        # Some Infos about the engine
        print_dbg("num optimization profiles:", self.engine.num_optimization_profiles)
        print_dbg("num io tensors:", self.engine.num_io_tensors)

        # Create CUDA stream for asynchronous tasks
        _, self.stream = cudart.cudaStreamCreate()

        # Setup I/O bindings
        self.inputs = []
        self.outputs = []
        self.allocations = []
        for prof_idx in range(self.engine.num_optimization_profiles):
          for i in range(self.engine.num_io_tensors):
              name = self.engine.get_tensor_name(i)
              is_input = False
              if self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT:
                  is_input = True
              dtype = np.dtype(trt.nptype(self.engine.get_tensor_dtype(name)))
              shape = self.engine.get_tensor_shape(name)
              if is_input and shape[0] < 0:
                  assert self.engine.num_optimization_profiles > 1
                  profile_shape = self.engine.get_tensor_profile_shape(name, prof_idx)
                  print_dbg("profile_shape", name, profile_shape)
                  assert len(profile_shape) == 3  # min,opt,max

                  # Set the *max* profile as binding shape
                  self.switch_profile(prof_idx)
                  self.context.set_input_shape(name, profile_shape[2])
                  shape = self.context.get_tensor_shape(name)

              if not is_input:
                shape = self.context.get_tensor_shape(name)
                print_dbg("shape for output:", name, shape)

              if is_input:
                  self.batch_size = shape[0]
              size = dtype.itemsize
              for s in shape:
                  size *= s
              allocation = cuda_call(cudart.cudaMalloc(size))
              host_allocation = None if is_input else np.zeros(shape, dtype)
              binding = {
                  "index": i,
                  "name": name,
                  "dtype": dtype,
                  "shape": list(shape),
                  "allocation": allocation,
                  "host_allocation": host_allocation,
              }
              self.allocations.append(allocation)
              if is_input:
                  self.inputs.append(binding)
              else:
                  self.outputs.append(binding)
              print_dbg(
                  "{} '{}' with shape {} and dtype {}".format(
                      "Input" if is_input else "Output",
                      binding["name"],
                      binding["shape"],
                      binding["dtype"],
                  )
              )
          print_dbg()

        assert self.batch_size > 0
        assert len(self.inputs) > 0
        assert len(self.outputs) > 0
        assert len(self.allocations) > 0



    def input_spec(self):
        """
        Get the specs for the input tensor of the network. Useful to prepare memory allocations.
        :return: Two items, the shape of the input tensor and its (numpy) datatype.
        """
        # TODO: Index 0 is wrong
        return self.inputs[0]["shape"], self.inputs[0]["dtype"]

    def output_spec(self):
        """
        Get the specs for the output tensors of the network. Useful to prepare memory allocations.
        :return: A list with two items per element, the shape and (numpy) datatype of each output tensor.
        """
        specs = []
        for o in self.outputs:
            specs.append((o["shape"], o["dtype"]))
        return specs

    def switch_profile(self, idx: int):
      self.context.set_optimization_profile_async(idx, self.stream)  # Switch to profile 1 (index 1)

    def infer(self, batch):
        """
        Execute inference on a batch of images.
        :param batch: A numpy array holding the image batch.
        :return A list of outputs as numpy arrays.
        """

        # If the optimization profile does not match, change it here:
          # In out setup the opt. profiles are selected in a wa that the
          # optimal batch sizes are powers of 2. In practice, one would not do
          # it in this way:
        expected_profile = int(np.log2(batch.shape[0]))
        if self.context.active_optimization_profile != expected_profile:
          print("Chaninging to profile", expected_profile)
          self.switch_profile(expected_profile)

        if self.context.get_tensor_shape("input") != batch.shape:
          print("Changing batch size for inference!")

          # Adapt the input shape:
          self.context.set_input_shape("input", batch.shape)
        print_dbg("self.engine.get_tensor_shape(input)", self.engine.get_tensor_shape("input"))
        print_dbg("self.context.get_tensor_shape(input)", self.context.get_tensor_shape("input"))
        print_dbg("self.context.get_tensor_shape(output)", self.context.get_tensor_shape("output"))
        print_dbg()

        o_idx = self.context.active_optimization_profile
        print_dbg("Active output index (opt. profile)", o_idx)

        # Copy I/O and Execute
        memcpy_host_to_device(self.inputs[o_idx]["allocation"], batch)

        self.context.execute_v2(self.allocations)
        memcpy_device_to_host(
                self.outputs[o_idx]["host_allocation"], self.outputs[o_idx]["allocation"]
            )

        return [self.outputs[o_idx]["host_allocation"]]

def main():
    seq_len = 2**12
    trt_infer = TensorRTInfer(trt_path)

    print("Starting inference")
    if False:
      spec = trt_infer.input_spec()
      print("spec", spec)
      # batch = my_dialed_sequence_signal.reshape(1, -1, 1).astype(np.float32)
      X, Y = dtmf_gen.generate_dataset(n_samples=64, t_length=seq_len)
      o = trt_infer.infer(X.astype(np.float32))[0][:X.shape[0]]
      print("o.shape", o.shape)

      thresholded = (o > 0.5).astype(int)
      print( (thresholded == Y).sum() / Y.size )
      for iidx in range(X.shape[0]):
        predicted_key_sequence = dtmf_gen.decode_prediction(o[iidx])
        original_key_sequence = dtmf_gen.decode_prediction(Y[iidx])
        if predicted_key_sequence != original_key_sequence:
          print("predicted_key_sequence", predicted_key_sequence)
          print("original_key_sequence", original_key_sequence)
      print("Done!")
    else:
        print("No input provided, running in benchmark mode")
        trt_infer.switch_profile(1)
        spec = trt_infer.input_spec()
        # TODO:
        spec = (1,4096,1), np.float32

        batch = np.random.rand(*spec[0]).astype(spec[1])
        print("batch.shape", batch.shape)
        print("batch.dtype", batch.dtype)
        print("min/max/mean", batch.min(), batch.max(), batch.mean())
        iterations = 100
        times = []
        for i in range(20):  # GPU warmup iterations
            trt_infer.infer(batch)
        for i in range(iterations):
            start = time.time()
            o = trt_infer.infer(batch)
            #print("o.shape", o[0].shape)
            times.append(time.time() - start)
            print("Iteration {} / {}".format(i + 1, iterations), end="\r")
        print("Benchmark results include time for H2D and D2H memory copies")
        print("Average Latency: {:.3f} ms".format(1000 * np.average(times)))
        print(
            "Average Throughput: {:.1f} ips".format(
                trt_infer.batch_size / np.average(times)
            )
        )

    print()
    print("Finished Processing")

if __name__ == "__main__":
    main()

## Erneutes Training des Keras Modells (Optional)

In [None]:
X_train, Y_train = dtmf_gen.generate_dataset(n_samples=1024, t_length=2**14)
X_val, Y_val = dtmf_gen.generate_dataset(n_samples=64, t_length=2**16)

print(X_train.shape, Y_train.shape, X_train.min(), X_train.max())
print(X_val.shape, Y_val.shape, X_val.min(), X_val.max())

In [None]:
# from collections.abc import Callable
from collections.abc import Callable
from pathlib import Path

import numpy as np
import onnxruntime as ort
import tensorflow as tf
from tensorflow.keras import layers

In [None]:
# from techdays25.dtmf_models import build_dtmf_classifier_model

model = build_dtmf_classifier_model((None, 1), dtmf_gen.get_num_keys() + 1)
adam = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(
    optimizer=adam, loss="categorical_crossentropy", metrics=["accuracy"]
)  # multi-label
model.summary()

In [None]:
model.fit(X_train, Y_train, batch_size=64, epochs=50, validation_data=(X_val, Y_val))
model.save("dtmf_classifier.keras")

In [None]:
model = tf.keras.models.load_model("dtmf_classifier.keras")

In [None]:
import IPython

idx = 9

# TODO: Duplicate Code below:
pred = model.predict(X_val[idx : idx + 1])
cmap = plt.get_cmap("tab20")
colors = [cmap(i) for i in range(16)]  # Get 16 distinct colors

plt.figure(figsize=(10, 5))
plt.plot(X_val[idx, :, 0])

for key_idx in range(pred.shape[-1] - 1):  # last index represents pauses
    plt.plot(
        pred[0, :, key_idx],
        label=f"{dtmf_gen.get_key(key_idx=key_idx)}",
        color=colors[key_idx],
    )
plt.legend(loc="upper center", bbox_to_anchor=(0.5, -0.15), ncol=8)
plt.show()

wavfile.write(
    "val.wav",
    dtmf_gen.get_sample_rate(),
    (X_val[idx] * np.iinfo(np.int32).max).flatten().astype(np.int32),
)
IPython.display.Audio("val.wav")

In [None]:
# Compute Accuracy
pred = model.predict(X_val)
thresholded = (pred > 0.5).astype(int)

(thresholded == Y_val).sum() / Y_val.size

# TRTExec

In [None]:
!uname -a
!cat /etc/os-release
!ls -l /usr/local
!nvcc --version
!nvidia-smi

In [None]:
# !git clone https://github.com/NVIDIA/TensorRT.git

In [None]:
export LD_LIBRARY_PATH=/usr/local/lib/python3.11/dist-packages/tensorrt_libs/

In [None]:
import tensorrt
tensorrt.__version__

In [None]:
!export LD_LIBRARY_PATH=/usr/local/lib/python3.11/dist-packages/tensorrt_libs/ && \
./trtexec --onnx=dtmf_classifier.onnx

# Experiments with different Data Types

In [None]:
# Load cuDNN libraries
import ctypes
import glob
import os
import sys
from nvidia import cudnn


def try_load(library):
    try:
        ctypes.CDLL(
            library, mode=ctypes.RTLD_GLOBAL
        )  # Use RTLD_GLOBAL to make symbols available
    except OSError:
        pass


def try_load_libs_from_dir(path):
    # Load all .so files (Linux)
    for lib in glob.iglob(os.path.join(path, "*.so*")):
        print("try:", lib)
        try_load(lib)


# Get the cudnn library path
CUDNN_LIB_DIR = os.path.join(cudnn.__path__[0], "lib")
print(CUDNN_LIB_DIR)

# Try loading all libraries in the cudnn lib directory
# try_load_libs_from_dir(CUDNN_LIB_DIR)

In [None]:
# Import tensorrt_libs
import tensorrt_libs

# Import ONNX dependencies
import onnxruntime as ort  # Import the ONNX Runtime
from onnxruntime.tools.symbolic_shape_infer import SymbolicShapeInference
from onnxruntime.quantization import (
    CalibrationDataReader,
    CalibrationMethod,
    create_calibrator,
    write_calibration_table,
)

In [None]:
ort.get_available_providers()

In [None]:
providers = [
    (
        "TensorrtExecutionProvider",
        {
            "device_id": 0,  # The device ID
            'trt_max_workspace_size': 4e9, # Maximum workspace size for TensorRT engine (1e9 ≈ 1GB)
            "trt_fp16_enable": False,
            "trt_int8_enable": True,
            "trt_int8_use_native_calibration_table": False,
            "trt_engine_cache_enable": False,
            "trt_engine_cache_path": "./trt_catch_dir",
            #"trt_profile_opt_shapes": "input:1x4096",
            #"trt_profile_min_shapes": "input:1x4096",
            #"trt_profile_max_shapes": "input:2x4096",
        },
    )
]

sess_opt = ort.SessionOptions()
sess_opt.log_severity_level = 0  # 0 is the most verbose level
sess_opt.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
# sess_opt.graph_optimization_level = ort.GraphOptimizationLevel.ORT_DISABLE_ALL

# Load the model and create an InferenceSession
session = ort.InferenceSession(
    "dtmf_classifier.onnx", sess_options=sess_opt, providers=providers
)

In [None]:
# Run inference with the quantized model
X, Y = dtmf_gen.generate_dataset(n_samples=64, t_length=2**12)
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name

result = session.run([output_name], {input_name: X[0:1].astype(np.float32)})[0]
# result = session.run([output_name], {input_name: np.ones( (5,1) ).astype(np.float32)})[0]

In [None]:
# <TODO>: Add custom ONNX layer which does the pre-scaling of the input signal...
# and post-processing: rle of the signal, discard keys which are short than 23 ms
# Save inference times of each algo in a csv file (in case of a crash)

# Further Experiments with Data Types

In [None]:
import sys
sys.float_info

In [None]:
import numpy as np
import torch


def float_to_binary_fp32(num: float) -> str:
    """Converts a built-in floating point number (64-bit) to its FP32 binary representation.

    Args:
        num (float): The floating point number to convert.

    Returns:
        str: A string representing the binary format of the floating point number.
    """
    print("fp32:", num)
    return "".join(f"{c:0>8b}" for c in struct.pack("!f", num))


def float_to_binary_fp16(num: float) -> str:
    """Converts a builtin-in floating point number to a 16-bit floating point number and returns its binary representation.

    Args:
        num (float): The floating point number to convert.

    Returns:
        str: A string representing the binary format of the 16-bit floating point number.
    """
    # Convert the number to a float16
    float16_num = np.float16(num)

    print("fp16:", float16_num)

    # Convert the float16 to bytes
    float16_bytes = float16_num.tobytes()

    # Convert the bytes to a binary string (big endian notation)
    return "".join(f"{byte:08b}" for byte in reversed(float16_bytes))


def float_to_binary_bf16(num: float) -> str:
    """Converts a floating point number to bfloat16  and returns its binary representation.

    Args:
        num (float): The floating point number to convert.

    Returns:
        str: A string representing the binary format of the bfloat16 floating point number.
    """
    # Create a tensor with the given number
    a = torch.Tensor([num])

    # Convert the tensor to bfloat16
    bf = a.bfloat16()

    print("bf16", bf)

    # Convert the bfloat16 tensor to bytes
    bf_bytes = bytes(bf.untyped_storage())

    # Convert the bytes to a binary string (big endian notation)
    return "".join(f"{byte:08b}" for byte in reversed(bf_bytes))


def float_to_binary_fp8_e4m3(num: float) -> str:
    """Converts a  floating point number to float8 (e4m3) and returns its binary representation.

    Args:
        num (float): The floating point number to convert.

    Returns:
        str: A string representing the binary format of the float8 (e4m3) floating point number.
    """
    # Create a tensor with the given number
    a = torch.Tensor([num])

    # Convert the tensor to float8 (e4m3)
    bf = a.to(torch.float8_e4m3fn)

    print("fp8_e4m3", bf)

    # Convert the float8 tensor to bytes
    bf_bytes = bytes(bf.untyped_storage())

    # Convert the bytes to a binary string
    return "".join(f"{byte:08b}" for byte in bf_bytes)


def float_to_binary_fp8_e5m2(num: float) -> str:
    """Converts a floating point number to float8 (e5m2)  and returns its binary representation.

    Args:
        num (float): The floating point number to convert.

    Returns:
        str: A string representing the binary format of the float8 (e5m2) floating point number.
    """
    # Create a tensor with the given number
    a = torch.Tensor([num])

    # Convert the tensor to float8 (e5m2)
    bf = a.to(torch.float8_e5m2)

    print("fp8_e5m2", bf)

    # Convert the float8 tensor to bytes
    bf_bytes = bytes(bf.untyped_storage())

    # Convert the bytes to a binary string
    return "".join(f"{byte:08b}" for byte in bf_bytes)


def float_to_binary_int(num: float, bit_length: int = 8) -> str:
    """Converts a floating point number to its binary representation as an integer.

    Args:
        num (float): The floating point number to convert.
        bit_length (int, optional): The bit length of the binary representation. Defaults to 8.

    Returns:
        str: A string representing the binary format of the integer part of the floating point number.
    """
    return np.binary_repr(round(num), width=bit_length)


num = -8.875074538462327 - 2**-7 - 2**-8
float_to_binary_fp32(num)
# float_to_binary_fp16(num)
# float_to_binary_bf16(num)
# float_to_binary_fp8_e4m3(num)
# float_to_binary_fp8_e5m2(num)
# float_to_binary_int(num, bit_length=8)

In [None]:
-num - 2**3 - 2**-1 - 2**-2 - 2**-3 - 2**-7 - 2**-8


(int("1111011100011101", base=2) - 2**16) / 2**8

In [None]:
import numpy as np

s = "1100100001111100"
b = int(s, base=2).to_bytes(2, "little")
print(b)
c = np.frombuffer(b, dtype=np.float16, count=1)
print(c)

In [None]:
# Binary string
binary_string = "11000001000011111000101100101011"

# Convert the binary string to an integer
binary_int = int(binary_string, 2)

# Convert the integer to bytes (4 bytes for float32)
binary_bytes = binary_int.to_bytes(4, byteorder="big")

# Unpack the bytes to a float
float_value = struct.unpack(">f", binary_bytes)[0]

print(float_value)