# Lab5: Deep Learning on PYNQ
## Scope
In the last lab, we have learned how to map a traditional image processing algorithm on the FPGA in a HLS manner.


For this lab, we will explore how to deploy a Quantised Neural Network(QNN) on our FPGAs to finish a keyword spotting (KWS).


We will finish this task with:
- Dataset: Google Speech V2 (preprocessed version, 12 classes, MFCC feature extracted)
- Model:   QMLP (3bits)
- Board:   PYNQ-Z2 


This Lab5 contains 3 parts:
- Lab5 A: Train a quantised model and find out the difference between the float NN and the QNN.
- Lab5 B: Export the quantised model into a hardware design which could be excuted on our PYNQ board.
- Lab5 C: Excute the model in the jupyter notebook to benchmark its performance.

## Note
We do encourage you to finish this lab in a FINN docker enviroment, but considering limited time, you could also try this in a normal conda/python/colab enviroment.


In Lab5B, to generate your own DNN IP, it must be done in the FINN docker. Alternatively, you can also use the generated files provided in the blackboard to continue Lab5 C, or ask TA for a online jupyter sever link with configured enviroment to execute your IP/overlay generation scripts.


For what is FINN and how to set up a FINN enviroment, here are some links might be helpful for you:
- Enviroment setup: https://github.com/CNStanLee/start_with_finn.git
- FINN official docs: https://finn.readthedocs.io/en/latest/
- FINN github repo: https://github.com/Xilinx/finn
- FINN examples repo: https://github.com/Xilinx/finn-examples




# Lab5 A: Train A Quantised Model


## Setup basic enviroment

In [13]:
! nvidia-smi

/bin/bash: line 1: nvidia-smi: command not found


In [14]:
import os
from pathlib import Path
import urllib.request
import tarfile
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from brevitas.nn import QuantConv2d, QuantLinear, QuantReLU 
import torch.nn as nn

In [15]:
! pwd

/home/changhong/prj/finn_cli_fork/notebooks


In [16]:
root_path = Path("lab_new")  # replace with your root path
npz_path = root_path / "data" / "kws_12cls_mfcc_10x49.npz"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cpu


## Import the dataset

In [17]:
data = np.load(npz_path, allow_pickle=True)

X_train = data["X_train"]    # (N_train, 1, 10, 49)
y_train = data["y_train"]    # (N_train,)
X_val   = data["X_valid"]    # (N_val, 1, 10, 49)
y_val   = data["y_valid"]
X_test  = data["X_test"]     # (N_test, 1, 10, 49)
y_test  = data["y_test"]
label_names = data["label_names"]  # ['yes','no',...,'silence','unknown']

print("X_train:", X_train.shape, "y_train:", y_train.shape)
print("X_val  :", X_val.shape,   "y_val  :", y_val.shape)
print("X_test :", X_test.shape,  "y_test :", y_test.shape)
print("labels:", label_names)

def print_label_stats(name, y):
    uniq, cnt = np.unique(y, return_counts=True)
    print(f"\n{name} label stats:")
    for u, c in zip(uniq, cnt):
        print(f"  idx={u:2d} ({label_names[u]:8s}): {c:6d}")

print_label_stats("Train", y_train)
print_label_stats("Val",   y_val)
print_label_stats("Test",  y_test)


X_train: (36769, 1, 10, 49) y_train: (36769,)
X_val  : (4503, 1, 10, 49) y_val  : (4503,)
X_test : (4874, 1, 10, 49) y_test : (4874,)
labels: ['yes' 'no' 'up' 'down' 'left' 'right' 'on' 'off' 'stop' 'go' 'silence'
 'unknown']

Train label stats:
  idx= 0 (yes     ):   3228
  idx= 1 (no      ):   3130
  idx= 2 (up      ):   2948
  idx= 3 (down    ):   3134
  idx= 4 (left    ):   3037
  idx= 5 (right   ):   3019
  idx= 6 (on      ):   3086
  idx= 7 (off     ):   2970
  idx= 8 (stop    ):   3111
  idx= 9 (go      ):   3106
  idx=10 (silence ):   3000
  idx=11 (unknown ):   3000

Val label stats:
  idx= 0 (yes     ):    397
  idx= 1 (no      ):    406
  idx= 2 (up      ):    350
  idx= 3 (down    ):    377
  idx= 4 (left    ):    352
  idx= 5 (right   ):    363
  idx= 6 (on      ):    363
  idx= 7 (off     ):    373
  idx= 8 (stop    ):    350
  idx= 9 (go      ):    372
  idx=10 (silence ):    400
  idx=11 (unknown ):    400

Test label stats:
  idx= 0 (yes     ):    419
  idx= 1 (no     

In [18]:
mean = X_train.mean()
std = X_train.std() + 1e-8

X_train_norm = (X_train - mean) / std
X_val_norm   = (X_val   - mean) / std
X_test_norm  = (X_test  - mean) / std

print("mean:", float(mean), "std:", float(std))

class KWSDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X).float()   # (N, 1, 10, 49)
        self.y = torch.from_numpy(y).long()    # (N,)
    def __len__(self):
        return self.X.shape[0]
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_ds = KWSDataset(X_train_norm, y_train)
val_ds   = KWSDataset(X_val_norm,   y_val)
test_ds  = KWSDataset(X_test_norm,  y_test)

batch_size = 128

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,  drop_last=False)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, drop_last=False)
test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False, drop_last=False)

len(train_ds), len(val_ds), len(test_ds)


mean: -9.13790512084961 std: 73.45867157982421


(36769, 4503, 4874)

## Define the Float Model

In [19]:
class FloatMLP(nn.Module):
    def __init__(self, num_classes=12, hidden_dim=256, dropout_p=0.3):
        super().__init__()
        self.in_features = 1 * 10 * 49
        self.net = nn.Sequential(
            nn.Linear(self.in_features, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(p=dropout_p),
            nn.Linear(hidden_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(p=dropout_p),
            nn.Linear(hidden_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(p=dropout_p),
            nn.Linear(hidden_dim, num_classes),
        )

    def forward(self, x):
        x = x.view(x.size(0), -1)
        return self.net(x)


## Define the Quantised Model

In [20]:
from brevitas.nn import QuantIdentity

class QuantMLPKWS_Dropout(nn.Module):
    def __init__(self, num_classes=12, hidden_dim=256, dropout_p=0.2,
                 w_bit=3, a_bit=3, in_bit=8):
        super().__init__()
        self.in_features = 1 * 10 * 49

        self.input_quant = QuantIdentity(
            bit_width=in_bit,        # 8
            return_quant_tensor=False
        )

        # Layer 1: 490 -> 256
        self.fc1 = QuantLinear(
            in_features=self.in_features,
            out_features=hidden_dim,
            weight_bit_width=w_bit,   # W3
            bias=True,
            return_quant_tensor=False
        )
        self.bn1 = nn.BatchNorm1d(hidden_dim)
        self.act1 = QuantReLU(
            bit_width=a_bit,          # A3
            return_quant_tensor=False
        )
        self.drop1 = nn.Dropout(p=dropout_p)

        # Layer 2: 256 -> 256
        self.fc2 = QuantLinear(
            in_features=hidden_dim,
            out_features=hidden_dim,
            weight_bit_width=w_bit,
            bias=True,
            return_quant_tensor=False
        )
        self.bn2 = nn.BatchNorm1d(hidden_dim)
        self.act2 = QuantReLU(
            bit_width=a_bit,
            return_quant_tensor=False
        )
        self.drop2 = nn.Dropout(p=dropout_p)

        # Layer 3: 256 -> 256
        self.fc3 = QuantLinear(
            in_features=hidden_dim,
            out_features=hidden_dim,
            weight_bit_width=w_bit,
            bias=True,
            return_quant_tensor=False
        )
        self.bn3 = nn.BatchNorm1d(hidden_dim)
        self.act3 = QuantReLU(
            bit_width=a_bit,
            return_quant_tensor=False
        )
        self.drop3 = nn.Dropout(p=dropout_p)

        # Output layer: 256 -> num_classes
        self.fc_out = QuantLinear(
            in_features=hidden_dim,
            out_features=num_classes,
            weight_bit_width=w_bit,
            bias=True,
            return_quant_tensor=False
        )
        self.flatten = nn.Flatten(start_dim=1)

    def forward(self, x):
        # x: (B, 1, 10, 49)
        x = self.input_quant(x)    
        x = self.flatten(x)          

        x = self.fc1(x)
        x = self.bn1(x)
        x = self.act1(x)
        x = self.drop1(x)

        x = self.fc2(x)
        x = self.bn2(x)
        x = self.act2(x)
        x = self.drop2(x)

        x = self.fc3(x)
        x = self.bn3(x)
        x = self.act3(x)
        x = self.drop3(x)

        x = self.fc_out(x)
        return x


## Train functions

In [21]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader


class Trainer:
    def __init__(
        self,
        model: nn.Module,
        train_loader: DataLoader,
        val_loader: DataLoader = None,
        test_loader: DataLoader = None,
        device: torch.device = None,
        # --- Hyperparameters ---
        lr: float = 3e-4,
        weight_decay: float = 1e-4,
        batch_size: int = 64,
        num_epochs: int = 100,
        scheduler_factor: float = 0.5,
        scheduler_patience: int = 3,
        optimizer_cls=torch.optim.Adam,
        criterion: nn.Module = None,
    ):
        """
        A simple training framework for classification tasks.

        Args:
            model: Neural network model (nn.Module)
            train_loader: DataLoader for training set
            val_loader: DataLoader for validation set
            test_loader: DataLoader for test set (optional)
            device: torch.device (if None, automatically selects cuda or cpu)
            lr: Learning rate
            weight_decay: Weight decay (L2 regularization)
            batch_size: Batch size (for reference or logging)
            num_epochs: Number of training epochs
            scheduler_factor: Factor by which LR is reduced (ReduceLROnPlateau)
            scheduler_patience: Number of epochs with no improvement before LR reduction
            optimizer_cls: Optimizer class (e.g., Adam, SGD)
            criterion: Loss function (default: CrossEntropyLoss)
        """
        self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = model.to(self.device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.test_loader = test_loader

        # --- Save hyperparameters ---
        self.lr = lr
        self.weight_decay = weight_decay
        self.batch_size = batch_size
        self.num_epochs = num_epochs
        self.scheduler_factor = scheduler_factor
        self.scheduler_patience = scheduler_patience

        # --- Training components ---
        self.criterion = criterion or nn.CrossEntropyLoss()
        self.optimizer = optimizer_cls(self.model.parameters(), lr=lr, weight_decay=weight_decay)

        # Scheduler triggered by validation accuracy
        if self.val_loader is not None:
            self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
                self.optimizer,
                mode="max",
                factor=scheduler_factor,
                patience=scheduler_patience,
            )
        else:
            self.scheduler = None

        # --- Bookkeeping ---
        self.best_val_acc = 0.0
        self.best_state_dict = None
        self.history = {
            "train_loss": [],
            "train_acc": [],
            "val_loss": [],
            "val_acc": [],
        }

    def _run_one_epoch(self, loader, train: bool = True):
        """
        Run one epoch of training or evaluation.
        """
        if train:
            self.model.train()
        else:
            self.model.eval()

        total_loss = 0.0
        total_correct = 0
        total_samples = 0

        context = torch.enable_grad() if train else torch.no_grad()
        with context:
            for X, y in loader:
                X, y = X.to(self.device), y.to(self.device)

                if train:
                    self.optimizer.zero_grad()

                logits = self.model(X)
                loss = self.criterion(logits, y)

                if train:
                    loss.backward()
                    self.optimizer.step()

                total_loss += loss.item() * X.size(0)
                preds = logits.argmax(dim=1)
                total_correct += (preds == y).sum().item()
                total_samples += X.size(0)

        avg_loss = total_loss / total_samples
        acc = total_correct / total_samples
        return avg_loss, acc

    def train(self):
        """
        Main training loop.
        Tracks and reports both training and validation performance.
        """
        for epoch in range(1, self.num_epochs + 1):
            train_loss, train_acc = self._run_one_epoch(self.train_loader, train=True)

            if self.val_loader is not None:
                val_loss, val_acc = self._run_one_epoch(self.val_loader, train=False)

                # Step the LR scheduler based on validation accuracy
                if self.scheduler is not None:
                    self.scheduler.step(val_acc)

                # Track best model
                if val_acc > self.best_val_acc:
                    self.best_val_acc = val_acc
                    self.best_state_dict = {
                        k: v.cpu().clone() for k, v in self.model.state_dict().items()
                    }

                # Log metrics
                self.history["train_loss"].append(train_loss)
                self.history["train_acc"].append(train_acc)
                self.history["val_loss"].append(val_loss)
                self.history["val_acc"].append(val_acc)

                print(
                    f"Epoch {epoch:02d}/{self.num_epochs} | "
                    f"Train Loss={train_loss:.4f}, Train Acc={train_acc*100:5.2f}% | "
                    f"Val Loss={val_loss:.4f}, Val Acc={val_acc*100:5.2f}%"
                )
            else:
                # No validation set
                self.history["train_loss"].append(train_loss)
                self.history["train_acc"].append(train_acc)
                print(
                    f"Epoch {epoch:02d}/{self.num_epochs} | "
                    f"Train Loss={train_loss:.4f}, Train Acc={train_acc*100:5.2f}%"
                )

        if self.val_loader is not None:
            print(f"\n[INFO] Best Validation Accuracy = {self.best_val_acc*100:.2f}%")

    def load_best_model(self):
        """
        Restore the best-performing model parameters (based on validation accuracy).
        """
        if self.best_state_dict is not None:
            self.model.load_state_dict(self.best_state_dict)
            self.model.to(self.device)
        else:
            print("[WARN] No best_state_dict found. Ensure validation was used during training.")

    def test(self, test_loader: DataLoader = None):
        """
        Evaluate the model on the test set.
        Automatically loads the best checkpoint if available.
        """
        loader = test_loader or self.test_loader
        if loader is None:
            raise ValueError("No test_loader provided.")

        # Use the best model checkpoint if available
        if self.best_state_dict is not None:
            self.load_best_model()

        test_loss, test_acc = self._run_one_epoch(loader, train=False)
        print(f"[TEST] Loss={test_loss:.4f}, Accuracy={test_acc*100:5.2f}%")
        return test_loss, test_acc


## Train the float model

In [22]:
model = FloatMLP(num_classes=12, hidden_dim=256, dropout_p=0.3).to(device)

trainer = Trainer(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    device=device,
    lr=3e-4,
    weight_decay=1e-4,
    batch_size=256,
    num_epochs=20,
    scheduler_factor=0.5,
    scheduler_patience=3,
)

# Train and evaluate
trainer.train()
trainer.test()


Epoch 01/20 | Train Loss=1.7571, Train Acc=40.43% | Val Loss=1.2226, Val Acc=60.09%
Epoch 02/20 | Train Loss=1.1902, Train Acc=59.60% | Val Loss=0.9372, Val Acc=69.82%
Epoch 03/20 | Train Loss=1.0075, Train Acc=65.73% | Val Loss=0.8460, Val Acc=71.93%
Epoch 04/20 | Train Loss=0.9087, Train Acc=69.01% | Val Loss=0.8037, Val Acc=73.00%
Epoch 05/20 | Train Loss=0.8369, Train Acc=71.44% | Val Loss=0.7492, Val Acc=75.39%
Epoch 06/20 | Train Loss=0.7875, Train Acc=73.15% | Val Loss=0.7412, Val Acc=75.24%
Epoch 07/20 | Train Loss=0.7490, Train Acc=74.36% | Val Loss=0.7013, Val Acc=76.73%
Epoch 08/20 | Train Loss=0.7103, Train Acc=75.73% | Val Loss=0.7055, Val Acc=76.42%
Epoch 09/20 | Train Loss=0.6800, Train Acc=76.86% | Val Loss=0.6783, Val Acc=77.37%
Epoch 10/20 | Train Loss=0.6562, Train Acc=77.59% | Val Loss=0.6741, Val Acc=77.70%
Epoch 11/20 | Train Loss=0.6345, Train Acc=78.31% | Val Loss=0.6572, Val Acc=78.57%
Epoch 12/20 | Train Loss=0.6131, Train Acc=78.96% | Val Loss=0.6724, Val Acc

(0.64573270164335, 0.7810832991382848)

## Train the quantised model

In [23]:
model = QuantMLPKWS_Dropout(
    num_classes=12,
    hidden_dim=256,
    dropout_p=0.3,   
    w_bit=3, a_bit=3
).to(device)

trainer = Trainer(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    device=device,
    lr=3e-4,
    weight_decay=1e-4,
    batch_size=256,
    num_epochs=20,
    scheduler_factor=0.5,
    scheduler_patience=3,
)

# Train and evaluate
trainer.train()
trainer.test()

# save weights
weight_dir = root_path / "weights"
weight_dir.mkdir(parents=True, exist_ok=True)
torch.save(model.state_dict(), weight_dir / "mlpw3a3_model_weights.pth")


Epoch 01/20 | Train Loss=1.8106, Train Acc=38.08% | Val Loss=1.3457, Val Acc=53.25%
Epoch 02/20 | Train Loss=1.2638, Train Acc=56.85% | Val Loss=1.0940, Val Acc=64.18%
Epoch 03/20 | Train Loss=1.0865, Train Acc=62.84% | Val Loss=1.0022, Val Acc=66.51%
Epoch 04/20 | Train Loss=0.9965, Train Acc=65.76% | Val Loss=0.9936, Val Acc=66.93%
Epoch 05/20 | Train Loss=0.9445, Train Acc=67.71% | Val Loss=1.0432, Val Acc=64.67%
Epoch 06/20 | Train Loss=0.8894, Train Acc=69.75% | Val Loss=0.9520, Val Acc=67.95%
Epoch 07/20 | Train Loss=0.8657, Train Acc=70.12% | Val Loss=0.8550, Val Acc=72.26%
Epoch 08/20 | Train Loss=0.8276, Train Acc=71.73% | Val Loss=0.9138, Val Acc=68.73%
Epoch 09/20 | Train Loss=0.8101, Train Acc=72.37% | Val Loss=0.9170, Val Acc=68.67%
Epoch 10/20 | Train Loss=0.7818, Train Acc=73.08% | Val Loss=0.8733, Val Acc=70.66%
Epoch 11/20 | Train Loss=0.7697, Train Acc=73.73% | Val Loss=0.9023, Val Acc=69.95%
Epoch 12/20 | Train Loss=0.7290, Train Acc=75.07% | Val Loss=0.8067, Val Acc

### Find out what is the difference and how it is relatated to the bitwidth, how to select bitwidth?

## Try other bit width

In [24]:
your_weight_bitwidth = 4  # Example: change to 4 bits
your_activation_bitwidth = 2  # Example: change to 4 bits


model = QuantMLPKWS_Dropout(
    num_classes=12,
    hidden_dim=256,
    dropout_p=0.3,   
    w_bit=your_weight_bitwidth, a_bit=your_activation_bitwidth
).to(device)

trainer = Trainer(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    test_loader=test_loader,
    device=device,
    lr=3e-4,
    weight_decay=1e-4,
    batch_size=256,
    num_epochs=20,
    scheduler_factor=0.5,
    scheduler_patience=3,
)

# Train and evaluate
trainer.train()
trainer.test()


Epoch 01/20 | Train Loss=1.8826, Train Acc=35.09% | Val Loss=1.3459, Val Acc=56.14%
Epoch 02/20 | Train Loss=1.2879, Train Acc=55.38% | Val Loss=1.1008, Val Acc=62.27%
Epoch 03/20 | Train Loss=1.1043, Train Acc=62.17% | Val Loss=0.9790, Val Acc=67.18%
Epoch 04/20 | Train Loss=1.0087, Train Acc=65.62% | Val Loss=0.9111, Val Acc=68.47%
Epoch 05/20 | Train Loss=0.9444, Train Acc=67.57% | Val Loss=0.8859, Val Acc=71.42%
Epoch 06/20 | Train Loss=0.8852, Train Acc=69.96% | Val Loss=0.8596, Val Acc=71.17%
Epoch 07/20 | Train Loss=0.8423, Train Acc=71.48% | Val Loss=0.8585, Val Acc=71.95%
Epoch 08/20 | Train Loss=0.8132, Train Acc=72.15% | Val Loss=0.7907, Val Acc=73.44%
Epoch 09/20 | Train Loss=0.7790, Train Acc=73.50% | Val Loss=0.8288, Val Acc=72.26%
Epoch 10/20 | Train Loss=0.7612, Train Acc=74.17% | Val Loss=0.8234, Val Acc=71.97%
Epoch 11/20 | Train Loss=0.7386, Train Acc=74.80% | Val Loss=0.8163, Val Acc=72.82%
Epoch 12/20 | Train Loss=0.7269, Train Acc=75.09% | Val Loss=0.8224, Val Acc

(0.732341656034892, 0.7511284366023799)

### Report your accuracy and its corresponding bit width.
### Note: you could fine tune the hyper parameters, higher accuracy with smaller model is better

## Export your weights

In [None]:
weight_dir = root_path / "weights"
weight_dir.mkdir(parents=True, exist_ok=True)
torch.save(model.state_dict(), weight_dir / "mlpw3a3_model_weights.pth")

## Export your onnx graph

In [None]:
import torch
from pathlib import Path
from brevitas.export import export_qonnx 

model = QuantMLPKWS_Dropout(
    num_classes=12,
    hidden_dim=256,
    dropout_p=0.3,
    w_bit=3,
    a_bit=3
).cpu()

# --- 1. Load trained weights ---
weight_dir = root_path / "weights"
state_dict = torch.load(weight_dir / "mlpw3a3_model_weights.pth", map_location="cpu")
model.load_state_dict(state_dict)
model.eval()  # Always switch to eval mode before export

# --- 2. Prepare dummy input ---
# The dummy input shape must match the modelâ€™s expected input (B, 1, 10, 49)
dummy_input = torch.randn(1, 1, 10, 49)

# --- 3. Export to QONNX (for FINN / FPGA deployment) ---
export_path = str(root_path / "exports" / "kws_mlp_w3a3_qonnx.onnx")

# Ensure the export directory exists
export_dir = Path(export_path).parent
export_dir.mkdir(parents=True, exist_ok=True)


with torch.no_grad():
    export_qonnx(
        model,
        args=dummy_input,  # sometimes use input_t=dummy_input depending on brevitas version
        export_path=export_path
    )

print("QONNX model successfully exported to:", export_path)
