# PYNQTorch Project Demo

First, import dependencies and our custom operators.

In [1]:
import torch
import matplotlib
from pytorch_zynq import init_hardware as init, mmult

The basis of **AI acceleration** is matrix multiplication. Registering the `PrivateUseOne` interface in the torch backend and renaming it to `zynq` allows us to easily register the functions we want. We need to declare the device's functions and behaviors in the operator. Below is the declaration of the registered device:
```python
from .device import register_zynq_device, is_registered
from .device import enable_full_device, disable_full_device
from .device import enable_implicit_accel, disable_implicit_accel
from .ops import mmult, register_aten_impls
from .linear import ZynqLinear
from .hardware import init as init_hardware, is_hardware_available, deinit as deinit_hardware

__all__ = [
    "register_zynq_device",
    "is_registered",
    "mmult",
    "ZynqLinear",
    "init_hardware",
    "is_hardware_available",
    "register_aten_impls",
    "deinit_hardware",
    "enable_full_device",
    "disable_full_device",
    "enable_implicit_accel",
    "disable_implicit_accel",
    
]
```

The code above shows the behaviors and acceleration operations supported by our dual-pipeline Matrix Multiplication (**GeMM**) operator. How do we bind tensors to the operator? In fact, our hardware does not support loading tensors directly. However, we can cleverly load the tensor onto the **CPU** but give this tensor a property belonging to **zynq**. Then, we check in the operator backend whether the tensor property list contains the **zynq** property. As long as both tensors involved in the operation have the **zynq** device property, the operation will be bound to the hardware accelerator.

## Demo 1: General Matrix Multiplication (GeMM) Operator Benchmark

In this test, we directly call the `mmult` acceleration operation implemented in the operator. We built the above general matrix multiplication operator relying on a dual-pipeline (**pipeline**) **INT8** type **256**-dimension matrix multiplication accelerator. Compared to calling the **GeMM**-like algorithm executed by the **CPU**, the hardware **GeMM** we built has an extremely high speedup ratio. The test function definition is as follows:

In [2]:
import time

def benchmark_cpu(a, b, iters=3):
    torch.cuda.synchronize() if torch.cuda.is_available() else None
    with torch.no_grad():
        _ = torch.matmul(a, b)
        t0 = time.perf_counter()
        out = None
        for _ in range(iters):
            out = torch.matmul(a, b)
        t1 = time.perf_counter()
    return (t1 - t0) / iters, out

def benchmark_fpga(a, b, iters=3):
    with torch.no_grad():
        _ = mmult(a, b)
        t0 = time.perf_counter()
        out = None
        for _ in range(iters):
            out = mmult(a, b)
        t1 = time.perf_counter()
    return (t1 - t0) / iters, out

def make_inputs(n, m, p, kind):
    if kind == "FP32":
        a = torch.randn((n, m), dtype=torch.float32)
        b = torch.randn((m, p), dtype=torch.float32)
    elif kind == "FP16":
        a = torch.randn((n, m), dtype=torch.float16)
        b = torch.randn((m, p), dtype=torch.float16)
    elif kind == "INT8":
        a = torch.randint(-128, 127, (n, m), dtype=torch.int32)
        b = torch.randint(-128, 127, (m, p), dtype=torch.int32)
    else:
        a = torch.randint(-128, 127, (n, m), dtype=torch.int32)
        b = torch.randint(-128, 127, (m, p), dtype=torch.int32)
    return a, b

def test1():
    n = 1024
    m = 1024
    p = 1024
    iters = 1

    ok = init()
    kinds = ["FP32", "FP16", "INT8"]
    print("\n==================== MMULT BENCH ====================")
    print(f"Size: ({n} x {m}) @ ({m} x {p})")
    for kind in kinds:
        a, b = make_inputs(n, m, p, kind)
        a_cpu = a.to(torch.int32) if a.dtype in (torch.float32, torch.float16, torch.int16) else a
        b_cpu = b.to(torch.int32) if b.dtype in (torch.float32, torch.float16, torch.int16) else b
        cpu_time, cpu_out = benchmark_cpu(a_cpu, b_cpu, iters)
        fpga_time, fpga_out = benchmark_fpga(a_cpu, b_cpu, iters)
        diff = (cpu_out - fpga_out).abs()
        l2_err = torch.norm(cpu_out.float() - fpga_out.float()).item()
        max_abs = diff.max().item()
        eq_ratio = (diff == 0).float().mean().item()
        speedup = cpu_time / fpga_time if fpga_time > 0 else float('inf')
        print(f"\n[{kind}] CPU avg:   {cpu_time:.6f} s")
        print(f"[{kind}] ZYNQ avg:  {fpga_time:.6f} s")
        print(f"[{kind}] Speedup:   {speedup:.2f}x")
        print(f"[{kind}] L2 error:  {l2_err:.6f}")
        print(f"[{kind}] Max diff:  {max_abs}")
        print(f"[{kind}] Exact %:   {eq_ratio*100:.2f}%")
    print("====================================================")


Now start the test:

In [3]:
test1()

## Demo 2: CNN Forward Inference Benchmark based on Conv2D and F.linear Operations Accelerated by GeMM Operator

On the PyTorch platform, we can deploy many frontend applications. Here we chose the famous **SpeechBrain** as the frontend toolkit to run the **ASR CN AIShell Speech Recognition** model. **SpeechBrain** has a user-friendly model deployment method. In order to fully utilize the hardware acceleration effect, we use **PyTorch**'s sub-toolkit `qnnpack` to perform dynamic quantization on the model, and use the `int8` data format to modify the weight layers except for `clc_in`. This allows us to evaluate the performance of our accelerator while retaining high precision.

Below is the test framework code:

In [4]:
import os
import speechbrain
from speechbrain.inference.ASR import EncoderDecoderASR
from pytorch_zynq import (
    register_zynq_device,
    init_hardware as init,
    deinit_hardware,
    is_hardware_available,
    enable_full_device,
    disable_full_device,
)

torch.backends.quantized.engine = 'qnnpack'
os.environ["TORCH_FORCE_NO_WEIGHTS_ONLY_LOAD"] = "yes"

def _apply_dynamic_quant(m):
    targets = {
        torch.nn.Linear,
        torch.nn.LSTM,
        torch.nn.GRU,
        torch.nn.RNNCell,
        torch.nn.GRUCell,
        torch.nn.LSTMCell,
        torch.nn.Embedding,
        torch.nn.EmbeddingBag,
    }
    try:
        torch.quantization.quantize_dynamic(
            m.mods.encoder.transformer_encoder.transformer,
            targets,
            dtype=torch.qint8,
            inplace=True,
        )
    except Exception:
        pass
    try:
        if hasattr(m.mods.encoder, "enc"):
            torch.quantization.quantize_dynamic(
                m.mods.encoder.enc,
                targets,
                dtype=torch.qint8,
                inplace=True,
            )
    except Exception:
        pass
    return m


def run_asr(source, device_str, wav, hparams_file, do_quant):
    if device_str in ("zynq", "privateuseone"):
        enable_full_device()
    else:
        disable_full_device()
    m = EncoderDecoderASR.from_hparams(
        source=source,
        savedir=source,
        run_opts={"device": device_str},
        hparams_file=hparams_file,
    )
    try:
        engines = getattr(torch.backends.quantized, "supported_engines", [])
        if do_quant and ("qnnpack" in engines or "fbgemm" in engines) and device_str in ("zynq", "privateuseone"):
            _apply_dynamic_quant(m)
    except Exception:
        pass
    with torch.no_grad():
        t0 = time.perf_counter()
        out = m.transcribe_file(wav)
        t1 = time.perf_counter()
    return out, t1 - t0


def test2():
    source = "./ASR_CN"
    wav = "./test2.wav"
    hparams = ""

    register_zynq_device()
    init()
    hw = is_hardware_available()
    print(f"Hardware available: {hw}")

    # resolve hparams path: SpeechBrain joins savedir+filename internally,
    # so pass only the filename and let source be the directory
    if hparams:
        if os.path.isabs(args.hparams) or os.path.sep in args.hparams:
            args.source = os.path.dirname(args.hparams)
            hparams_file = os.path.basename(args.hparams)
        else:
            hparams_file = hparams
    else:
        hparams_file = "hyperparams.yaml"
    print(f"Using source: {source}")
    print(f"Using hparams: {hparams}")

    try:
        engines = getattr(torch.backends.quantized, "supported_engines", [])
        if args.quantized and ("qnnpack" not in engines and "fbgemm" not in engines):
            print("[WARN] No quantized engine available on this platform; falling back to float model")
            hparams_file = "hyperparams.yaml"
    except Exception:
        pass

    text_cpu, t_cpu = run_asr(source, "cpu", wav, hparams_file, True)
    text_fpga, t_fpga = ("", 0.0)
    if hw:
        text_fpga, t_fpga = run_asr(source, "zynq", wav, hparams_file, True)

    print("\n==================== ASR RESULTS ====================")
    print(f"CPU time:  {t_cpu:.6f}s")
    if hw:
        print(f"ZYNQ time: {t_fpga:.6f}s")
        print(f"Speedup:   {t_cpu / t_fpga:.2f}x" if t_fpga > 0 else "Speedup:   inf")
    print(f"CPU text:  {text_cpu}")
    if hw:
        print(f"ZYNQ text: {text_fpga}")
    print("====================================================")

Due to limited on-board computing power, the benchmark test takes about **5 minutes**. Please execute the test as appropriate.

In [12]:
test2()