In [2]:
import torch
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn as nn
from torch_optimizer import Lookahead
from src.data.config import dataset, data_loader, model as model_config, optimizer as optimizer_config, scheduler as scheduler_config, training

# Import custom modules
from src.models.model import KeywordSpottingModel_with_cls
from src.data.data_loader import load_speech_commands_dataset, TFDatasetAdapter, load_bg_noise_dataset
from src.utils.utils import set_memory_GB, print_model_size, log_to_file, plot_learning_curves,EarlyStopping
from src.utils.augmentations import add_time_shift_and_align, add_silence
from src.utils.train_utils import trainig_loop


In [4]:

# Load datasets
train_ds, val_ds, test_ds, silence_ds , info = load_speech_commands_dataset(reduced=False)
bg_noise_ds = load_bg_noise_dataset()


In [13]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

Device: cuda


In [6]:

# Initialize datasets with configurations
pytorch_train_dataset = TFDatasetAdapter(train_ds, bg_noise_ds, **dataset, augmentation=[lambda x: add_time_shift_and_align(x)])
pytorch_val_dataset = TFDatasetAdapter(val_ds, None, **dataset, augmentation=None)


2025-01-07 11:32:41.896014: I tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
2025-01-07 11:32:43.533445: I tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


In [7]:

# Create DataLoaders
train_loader = DataLoader(pytorch_train_dataset, **data_loader, shuffle=True)
val_loader = DataLoader(pytorch_val_dataset, **data_loader, shuffle=False)


In [8]:


# Initialize model
model = KeywordSpottingModel_with_cls(**model_config).to("cuda")

# Loss function
criterion = nn.CrossEntropyLoss().to("cuda")

# Optimizer
base_optimizer = optim.Adam(model.parameters(), lr=optimizer_config['lr'], weight_decay=optimizer_config['weight_decay'])
optimizer = Lookahead(base_optimizer, **optimizer_config['lookahead'])

# Scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, **scheduler_config['reduce_lr_on_plateau'])


In [9]:
model_size = sum(p.numel() for p in model.parameters())
print(f"Model size: {model_size}")


Model size: 333892


In [10]:

# # Training loop
# num_epochs = training['num_epochs']
# try:
#     train_accuracies, val_accuracies, train_losses, val_losses = trainig_loop(model, num_epochs, train_loader, val_loader, criterion, optimizer, scheduler)
#     plot_learning_curves(train_accuracies, val_accuracies, train_losses, val_losses, save_to_file=True)
# except Exception as err:
#     log_to_file(str(err))



In [11]:
model.load_state_dict(torch.load("best_model.pth"))

  model.load_state_dict(torch.load("best_model.pth"))


<All keys matched successfully>

In [12]:
# load test data
pytorch_test_dataset = TFDatasetAdapter(test_ds, None, **dataset, augmentation=None)
test_loader = DataLoader(pytorch_test_dataset, **data_loader, shuffle=False)

# Evaluate the model on the test set
accuracy = 0
total = 0
model.eval()

with torch.no_grad():
    for audio, labels in test_loader:
        audio, labels = audio.to("cuda"), labels.to("cuda")
        outputs = model(audio)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        accuracy += (predicted == labels).sum().item()
test_accuracy = 100 * accuracy / total
print(f'Test Accuracy: {test_accuracy}%')


Test Accuracy: 89.20245398773007%


In [19]:
from torch.quantization import quantize_qat
from tqdm import tqdm
model.train()
model.qconfig = torch.quantization.get_default_qconfig("fbgemm")  # Specify the backend
torch.quantization.prepare_qat(model, inplace=True)
import torch.quantization as quant

# Specify the quantization configuration
model.qconfig = quant.QConfig(
    activation=quant.default_observer,  # Observer for activations
    weight=quant.default_weight_observer  # Use per_tensor_affine for weights
)

# Prepare the model for quantization
quant.prepare(model, inplace=True)

# Calibrate or fine-tune the model (if needed)

# Fine-tune the model
for epoch in range(1):
    for audio, labels in tqdm(train_loader):
        audio, labels = audio.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(audio)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()


100%|████████████████████████████████████████████████████████████████████| 669/669 [13:00<00:00,  1.17s/it]


In [17]:

# Convert the model to quantized form
quant.convert(model, inplace=True)

RuntimeError: Unsupported qscheme: per_channel_affine

In [12]:

# # save model cpu map
# model.cpu()
torch.save(model.state_dict(), 'quntizeAwareModel.pth', map_location=torch.device(device))



In [13]:
model.load_state_dict(torch.load("quntizeAwareModel.pth"), strict=True, map_location=torch.device(device))

  model.load_state_dict(torch.load("quntizeAwareModel.pth"), strict=False)


<All keys matched successfully>

In [None]:
# model.to("cpu")
# load test data
pytorch_test_dataset = TFDatasetAdapter(test_ds, None, **dataset, augmentation=None)
test_loader = DataLoader(pytorch_test_dataset, **data_loader, shuffle=False)

# Evaluate the model on the test set
accuracy = 0
total = 0
model.eval()

with torch.no_grad():
    for audio, labels in test_loader:
        audio, labels = audio.to("cuda"), labels.to("cuda")
        outputs = model(audio)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        accuracy += (predicted == labels).sum().item()
test_accuracy = 100 * accuracy / total
print(f'Test Accuracy: {test_accuracy}%')


NameError: name 'model' is not defined

In [15]:
# get 1 sample from test data
audio, label = next(iter(test_loader))
audio, label = audio.to(device), label.to(device)

In [16]:
dummy_input = audio[0].unsqueeze(0)
torch.onnx.export(
    model,
    dummy_input,
    "model.onnx",
    export_params=True,
    opset_version=16,
    input_names=["input"],
    output_names=["output"],
    dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
)

  return 2 ** math.ceil(math.log2(len))
  if L == npo2(L):
  num_steps = int(math.log2(L))
  if Xa.size(2) == 4:


In [20]:
# batch size = 1 testloader
test_loader = DataLoader(pytorch_test_dataset, batch_size=10, shuffle=False)


In [21]:
pytorch_train_dataset.__len__()

85511

In [22]:
# Prepare calibration data
calibration_data = []
calibrate_data_loader = DataLoader(pytorch_train_dataset, batch_size=10, shuffle=True)

for audio, _ in calibrate_data_loader:
    audio_np = audio.numpy()  # Convert PyTorch tensors to NumPy arrays
    # Ensure the feature dimensions are [69, 135]
    reshaped_audio = audio_np.reshape(-1, 69, 135)
    calibration_data.extend(reshaped_audio)

    # Stop after collecting enough samples for calibration
    if len(calibration_data) >= 5000:
        break

In [23]:
from onnxruntime.quantization import CalibrationDataReader

class MyCalibrationDataReader(CalibrationDataReader):
    def __init__(self, calibration_data, input_name):
        """
        Initialize the data reader.
        :param calibration_data: List of calibration samples (e.g., images or tensors).
        :param input_name: Name of the input node in the ONNX model.
        """
        self.data = calibration_data
        self.index = 0
        self.input_name = input_name

    def get_next(self):
        """
        Retrieve the next batch of data for calibration.
        :return: Dictionary of {input_name: input_data_batch}.
        """
        if self.index < len(self.data):
            batch = self.data[self.index:self.index + 1]  # ONNX expects batched inputs
            self.index += 1
            return {self.input_name: batch}
        return None  # Signal the end of the data

In [24]:
from onnxruntime.quantization import quantize_static, QuantType, CalibrationMethod

quantize_static(
    "model.onnx",
    "quantized_model.onnx",
    weight_type=QuantType.QFLOAT8E4M3FN,
    nodes_to_exclude=["/mamba_layers.0/Slice_3", "/mamba_layers.1/Slice_3"],
    calibration_data_reader=MyCalibrationDataReader(calibration_data, "input"),
    calibrate_method = CalibrationMethod.Distribution,
    use_external_data_format=True,
)



Collecting tensor data and making histogram ...


: 

: 

: 

In [28]:
import onnx

# Load and check the quantized ONNX model
quantized_model_path = "quantized_model.onnx"
onnx_model = onnx.load(quantized_model_path)
onnx.checker.check_model(onnx_model)
print("Quantized ONNX model is valid!")

Quantized ONNX model is valid!


In [29]:
import onnxruntime as ort
import numpy as np

# Load the quantized model
session = ort.InferenceSession("quantized_model.onnx")

# Get model input and output names
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name

# Test inference with sample data
dummy_input = audio[0].unsqueeze(0).numpy()
outputs = session.run([output_name], {input_name: dummy_input})
print("Quantized model output:", outputs)

Quantized model output: [array([[ 0.12826443,  0.25652885,  0.5130577 ,  0.38479328,  0.76958656,
         0.897851  , -0.25652885,  0.64132214,  1.4109087 , -0.12826443,
         0.897851  ,  0.25652885]], dtype=float32)]


In [30]:
from tqdm import tqdm
correct = 0
total = 0
test_loader = DataLoader(pytorch_test_dataset, batch_size=32, shuffle=False)

for audio, labels in tqdm(test_loader):
    audio = audio.numpy()
    outputs = session.run([output_name], {input_name: audio})
    predicted = np.argmax(outputs[0], axis=1)
    total += labels.shape[0]
    correct += (predicted == labels.numpy()).sum()

accuracy = 100 * correct / total
print(f"Quantized model accuracy: {accuracy:.2f}%")

100%|██████████████████████████████████████████████████████████████████████| 39/39 [01:44<00:00,  2.68s/it]

Quantized model accuracy: 5.30%





In [11]:
import pandas as pd
from utils import compute_inference_GPU_mem

# Define missing variables
configs = {'batch_size': 32}  # Example value, adjust as needed
input_dim = model_config['input_dim']  # Example value, adjust as needed
d_model = model_config['d_model']  # Example value, adjust as needed
d_state = model_config['d_state']  # Example value, adjust as needed
d_conv = model_config['d_conv']  # Example value, adjust as needed
expand = model_config['expand']  # Example value, adjust as needed

# Save model size(macs, params) and accuracy
batch_size = configs['batch_size']
macs, params = print_model_size(model, input_size=torch.randn(batch_size, input_dim, d_model-1).to("cuda"))
macs = macs / 1e9
accuracy = test_accuracy
data = {'Model': ['KeywordSpottingModel_RSM_Norm_0-1-2_order_cls_bgnoise'], 'GMACs': [macs], 'Params': [params], 'Accuracy': [accuracy]}
model_config = {'input_dim': input_dim, 'd_model': d_model, 'd_state': d_state, 'd_conv': d_conv, 'expand': expand}
data.update(model_config)
inf_GPU_mem = compute_inference_GPU_mem(model, input=torch.randn(1, input_dim, d_model-1).to("cuda"))
# Inference macs and params
inf_macs, inf_params = print_model_size(model, input_size=torch.randn(1, input_dim, d_model-1).to("cuda"))
inference_data = {'Inference CUDA Mem in MB': [inf_GPU_mem], 'Inference GMACs': [inf_macs / 1e9], 'Inference Params': [inf_params]}
data.update(inference_data)
df = pd.DataFrame(data, index=[0])
df.to_csv('results.csv', mode='a', header=False)

[INFO] Register count_linear() for <class 'torch.nn.modules.linear.Linear'>.
[INFO] Register count_convNd() for <class 'torch.nn.modules.conv.Conv1d'>.
[INFO] Register zero_ops() for <class 'torch.nn.modules.dropout.Dropout'>.

MACs: 650593536.0 Which are 0.650593536 Giga-MACs, Params: 151242.0

[INFO] Register count_linear() for <class 'torch.nn.modules.linear.Linear'>.
[INFO] Register count_convNd() for <class 'torch.nn.modules.conv.Conv1d'>.
[INFO] Register zero_ops() for <class 'torch.nn.modules.dropout.Dropout'>.

MACs: 20331048.0 Which are 0.020331048 Giga-MACs, Params: 151242.0

