In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import sys
from pathlib import Path
import torch
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import h5py
import pandas as pd
# Define the project path
PROJECT_PATH = Path('/content/drive/MyDrive/AutomaticHeartSoundClassification-main')
sys.path.append(str(PROJECT_PATH))

DATASET_PATH = Path('/content/drive/MyDrive/AutomaticHeartSoundClassification-main/data')
sys.path.append(str(DATASET_PATH))

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

DATARAW_PATH = Path('/content/drive/MyDrive/physionet2016')
sys.path.append(str(DATASET_PATH))

In [3]:
MODEL_NAME = "CRNN"      # ho·∫∑c "CNN"
TIMESTAMP = "0108_105831"

# Dowload Package

In [4]:
import importlib
import subprocess
import sys

def check_or_install(pkg_name, import_name=None, version=None):
    """
    Check whether a Python package is already installed.
    If the package exists, print its current version.
    If the package does not exist, install the specified version.

    Parameters
    ----------
    pkg_name : str
        Package name as registered on PyPI.
    import_name : str, optional
        Module name used for import (if different from pkg_name).
    version : str, optional
        Required version. If None, any installed version is accepted.
    """
    name = import_name if import_name else pkg_name

    try:
        # Try to import the package
        module = importlib.import_module(name)

        # Retrieve the package version if available
        ver = getattr(module, "__version__", "unknown")

        print(f"[OK] {pkg_name} is already installed (version: {ver})")

    except ImportError:
        # Construct the pip install command
        install_target = pkg_name if version is None else f"{pkg_name}=={version}"

        print(f"[INSTALL] {pkg_name} not found. Installing {install_target} ...")

        # Install the package using pip
        subprocess.check_call([
            sys.executable,
            "-m",
            "pip",
            "install",
            install_target
        ])

In [5]:
#!pip install numpy==1.26.4

In [6]:
# Core ONNX ecosystem (recommended stable versions)
check_or_install("onnx", version="1.17.0")
check_or_install("onnxsim", import_name="onnxsim", version="0.4.33")
check_or_install("onnx-graphsurgeon", import_name="onnx_graphsurgeon")

# Runtime and model conversion tools
check_or_install("onnxruntime", import_name="onnxruntime", version="1.18.1")
check_or_install("onnx2tf", import_name="onnx2tf")
check_or_install("onnxscript ", import_name="onnxscript")
check_or_install("numpy", version="1.26.4")

[OK] onnx is already installed (version: 1.17.0)
[OK] onnxsim is already installed (version: 0.4.33)
[OK] onnx-graphsurgeon is already installed (version: 0.5.8)
[OK] onnxruntime is already installed (version: 1.18.1)
[OK] onnx2tf is already installed (version: 1.28.8)
[OK] onnxscript  is already installed (version: 0.5.7)
[OK] numpy is already installed (version: 1.26.4)


In [7]:
# TensorFlow and Keras (stable and compatible versions)
check_or_install("tensorflow", import_name="tensorflow", version="2.19.0")
check_or_install("tf-keras", import_name="tf_keras", version="2.19.0")

# Auxiliary packages for signal processing and edge deployment
check_or_install("h5py")
check_or_install("librosa")
check_or_install("ai_edge_litert", version="1.2.0")
check_or_install("sng4onnx")

[OK] tensorflow is already installed (version: 2.19.0)
[OK] tf-keras is already installed (version: 2.19.0)
[OK] h5py is already installed (version: 3.15.1)
[OK] librosa is already installed (version: 0.11.0)
[OK] ai_edge_litert is already installed (version: 1.2.0)
[OK] sng4onnx is already installed (version: 1.0.4)


In [8]:
import onnx
print("ONNX version:", onnx.__version__)
print("ONNX file:", onnx.__file__)

ONNX version: 1.17.0
ONNX file: /usr/local/lib/python3.12/dist-packages/onnx/__init__.py


# Loss and Metrics

In [9]:
import torch.nn.functional as F


def nll_loss(output, target):
    return F.nll_loss(output, target)

def ce_loss(output, target):
    return F.cross_entropy(output, target)

In [10]:
import torch


def accuracy(output, target):
    with torch.no_grad():
        pred = torch.argmax(output, dim=1)
        assert pred.shape[0] == len(target)
        correct = 0
        correct += torch.sum(pred == target).item()
    return correct / len(target)


def top_k_acc(output, target, k=3):
    with torch.no_grad():
        pred = torch.topk(output, k, dim=1)[1]
        assert pred.shape[0] == len(target)
        correct = 0
        for i in range(k):
            correct += torch.sum(pred[:, i] == target).item()
    return correct / len(target)

In [11]:
MODEL_DIR_MAP = {
    "VGG11": "VGG11",
    "CNN": "simple_cnn",
    "CRNN": "CRNN",
    "LSTM": "LSTM",
}


# Config Parse

In [12]:
import os
import logging
from pathlib import Path
from datetime import datetime
import argparse
import collections
import importlib.util

class ConfigParser:
    def __init__(self, config_dict, resume=None, modification=None, run_id=None):
        self._config = config_dict
        self.resume = resume

        # Apply CLI modifications
        if modification:
            for key_path, value in modification.items():
                keys = key_path.split(';')
                d = self._config
                for k in keys[:-1]:
                    d = d[k]
                d[keys[-1]] = value

        # Setup save and log directories
        save_dir = Path(self.config['trainer']['save_dir'])
        exper_name = self.config['name']
        if run_id is None:
            run_id = datetime.now().strftime(r'%m%d_%H%M%S')
        self._save_dir = save_dir / 'models' / exper_name / run_id
        self._log_dir = save_dir / 'log' / exper_name / run_id

        self.save_dir.mkdir(parents=True, exist_ok=True)
        self.log_dir.mkdir(parents=True, exist_ok=True)

        # Optional: setup logging
        # setup_logging(self.log_dir)

    @classmethod
    def from_args(cls, parser, options=None):
        """
        parser: argparse.ArgumentParser ƒë√£ ƒë∆∞·ª£c add_argument xong
        options: list c√°c CustomArgs ƒë·ªÉ override
        """
        if options is None:
            options = []

        # Parse arguments (an to√†n cho c·∫£ terminal v√† notebook)
        args = parser.parse_args()

        # X·ª≠ l√Ω device
        if args.device is not None:
            os.environ["CUDA_VISIBLE_DEVICES"] = args.device

        # X·ª≠ l√Ω resume
        if args.resume is not None:
            resume = Path(args.resume)
            cfg_fname = resume.parent / 'config.py'  # ho·∫∑c .json n·∫øu b·∫°n d√πng c·∫£ 2
        else:
            resume = None
            assert args.config is not None, "Ph·∫£i ch·ªâ ƒë·ªãnh -c config.py"
            cfg_fname = Path(args.config)

        # ƒê·ªçc config t·ª´ file .py
        if not cfg_fname.exists():
            raise FileNotFoundError(f"Kh√¥ng t√¨m th·∫•y config file: {cfg_fname}")

        if cfg_fname.suffix == '.py':
            spec = importlib.util.spec_from_file_location("config_module", cfg_fname)
            config_module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(config_module)
            if not hasattr(config_module, 'config'):
                raise ValueError(f"File {cfg_fname} ph·∫£i ƒë·ªãnh nghƒ©a bi·∫øn 'config'")
            config_dict = config_module.config
        else:
            raise ValueError("Ch·ªâ h·ªó tr·ª£ file config .py")

        # Parse custom CLI overrides
        CustomArgs = collections.namedtuple('CustomArgs', 'flags type target')
        modification = {}
        for opt in options:
            for flag in opt.flags:
                arg_name = flag.lstrip('-').replace('-', '_')
                if hasattr(args, arg_name) and getattr(args, arg_name) is not None:
                    modification[opt.target] = opt.type(getattr(args, arg_name))

        return cls(config_dict, resume, modification)

    def init_obj(self, name, module, *args, **kwargs):
        module_name = self[name]['type']
        module_args = dict(self[name]['args'])
        module_args.update(kwargs)
        if isinstance(module, dict):
            obj_class = module[module_name]
        else:
            obj_class = getattr(module, module_name)

        return obj_class(*args, **module_args)

    def init_ftn(self, name, module, *args, **kwargs):
        from functools import partial
        module_name = self[name]['type']
        module_args = dict(self[name]['args'])
        module_args.update(kwargs)
        if isinstance(module, dict):
            obj_class = module[module_name]
        else:
            obj_class = getattr(module, module_name)

        return obj_class(*args, **module_args)

    def __getitem__(self, name):
        return self.config[name]

    def get_logger(self, name, verbosity=2):
        logger = logging.getLogger(name)
        logger.setLevel({0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG}[verbosity])
        return logger

    @property
    def config(self):
        return self._config

    @property
    def save_dir(self):
        return self._save_dir

    @property
    def log_dir(self):
        return self._log_dir

# Model

In [13]:
import torch.nn as nn
import numpy as np
from abc import abstractmethod


class BaseModel(nn.Module):
    """
    Base class for all models
    """
    @abstractmethod
    def forward(self, *inputs):
        """
        Forward pass logic

        :return: Model output
        """
        raise NotImplementedError

    def __str__(self):
        """
        Model prints with number of trainable parameters
        """
        model_parameters = filter(lambda p: p.requires_grad, self.parameters())
        params = sum([np.prod(p.size()) for p in model_parameters])
        return super().__str__() + '\nTrainable parameters: {}'.format(params)

In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F


# functions of initializing layers
def init_layer(layer):
    """Initialize a Linear or Convolutional layer."""
    nn.init.xavier_uniform_(layer.weight)

    if hasattr(layer, 'bias'):
        if layer.bias is not None:
            layer.bias.data.fill_(0.)

def init_bn(bn):
    """Initialize a Batchnorm layer."""
    bn.bias.data.fill_(0.)
    bn.weight.data.fill_(1.)

def init_rnn(rnn):
    """init_rnn
    Initialized RNN weights, independent of type GRU/LSTM/RNN
    :param rnn: the rnn model
    """
    for name, param in rnn.named_parameters():
        if 'bias' in name:
            nn.init.constant_(param, 0.0)
        elif 'weight' in name:
            nn.init.xavier_uniform_(param)

def reset_parameters(model):
    for module in model.modules():
        if isinstance(module, nn.Conv2d):
            init_layer(module)
        elif isinstance(module, nn.Linear):
            init_layer(module)
        elif isinstance(module, nn.BatchNorm2d):
            init_bn(module)
        elif isinstance(module, nn.LSTM):
            init_rnn(module)


## CNN

In [15]:
class simple_cnn(BaseModel):
    def __init__(self, num_classes = 2, in_channel=1):
        super(simple_cnn, self).__init__()
        self.in_channel = in_channel
        self.conv1 = nn.Conv2d(in_channel,16,3)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16,32,3)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32,64,3)
        self.bn3 = nn.BatchNorm2d(64)
        self.pool = nn.AdaptiveMaxPool2d(1)
        self.fc1 = nn.Linear(64,32)
        self.output = nn.Linear(32,num_classes)
        self.activate = nn.Softmax(dim=1)

        self.init_weights()

    def init_weights(self):
        init_layer(self.conv1)
        init_layer(self.conv2)
        init_layer(self.conv3)
        init_layer(self.fc1)
        init_layer(self.output)
        init_bn(self.bn1)
        init_bn(self.bn2)
        init_bn(self.bn3)


    def forward(self,x):
        B, mel_bins, num_frames = x.size()
        x = x.view(B, self.in_channel, -1, num_frames)
        x = F.max_pool2d(F.relu(self.conv1(x)),(2,2))
        x = self.bn1(x)
        x = F.max_pool2d(F.relu(self.conv2(x)),(2,2))
        x = self.bn2(x)
        x = F.max_pool2d(F.relu(self.conv3(x)),(2,2))
        x = self.bn3(x)
        x = self.pool(x).reshape(x.size(0),-1)
        x = self.fc1(x)
        x = F.dropout(x, p=0.5, training=self.training)
        out = self.output(x)
        out = self.activate(out)
        return out

## Deeper CNN model, VGG like structure

In [16]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ConvBlock, self).__init__()

        self.conv1 = nn.Conv2d(in_channels = in_channels,
                               out_channels = out_channels,
                               kernel_size = (3,3),
                               stride = (1,1),
                               padding = (1,1),
                               bias =False)

        self.conv2 = nn.Conv2d(in_channels = out_channels,
                               out_channels = out_channels,
                               kernel_size = (3,3),
                               stride = (1,1),
                               padding = (1,1),
                               bias = False)

        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.init_weights()

    def init_weights(self):
        init_layer(self.conv1)
        init_layer(self.conv2)
        init_bn(self.bn1)
        init_bn(self.bn2)

    def forward(self, input, pool_size=(2, 2), pool_type='max', activation = 'relu'):
        x = input
        x = F.relu_(self.bn1(self.conv1(x)))
        if activation == 'relu':
            x = F.relu_(self.bn2(self.conv2(x)))
        elif activation == 'sigmoid':
            x = torch.sigmoid(self.bn2(self.conv2(x)))

        if pool_type == 'max':
            x = F.max_pool2d(x, kernel_size=pool_size)
        elif pool_type == 'avg':
            x = F.avg_pool2d(x, kernel_size=pool_size)
        elif pool_type == 'avg+max':
            x1 = F.avg_pool2d(x, kernel_size=pool_size)
            x2 = F.max_pool2d(x, kernel_size=pool_size)
            x = x1 + x2
        else:
            raise Exception('Incorrect argument!')

        return x

class VGG_11(BaseModel):
    def __init__(self, num_classes, in_channel):
        super(VGG_11, self).__init__()
        self.in_channel = in_channel
        self.bn0 = nn.BatchNorm2d(128)
        self.conv1 = ConvBlock(in_channels=in_channel, out_channels=64)
        self.conv2 = ConvBlock(in_channels=64, out_channels=128)
        self.conv3 = ConvBlock(in_channels=128, out_channels=256)
        self.conv4 = ConvBlock(in_channels=256, out_channels=512)
        self.fc_final = nn.Linear(512, num_classes)
        self.init_weights()

    def init_weights(self):
        init_bn(self.bn0)
        init_layer(self.fc_final)

    def forward(self, input):
        # (batch_size, 3, mel_bins, time_stamps)
        B, mel_bins, num_frames = input.size()
        x = input.view(B, self.in_channel, -1, num_frames)
        x = x.transpose(1, 2)
        x = self.bn0(x)
        x = x.transpose(1,2)

        # (samples_num, channel, mel_bins, time_stamps)
        x = self.conv1(x, pool_size=(2, 2), pool_type='max')
        x = F.dropout(x, p=0.2, training=self.training)
        x = self.conv2(x, pool_size=(2, 2), pool_type='max')
        x = F.dropout(x, p=0.2, training=self.training)
        x = self.conv3(x, pool_size=(2, 2), pool_type='max')
        x = F.dropout(x, p=0.2, training = self.training)
        x = self.conv4(x, pool_size=(2, 2), pool_type='max')
        x = F.dropout(x, p=0.2, training=self.training)

        output = F.max_pool2d(x, kernel_size=x.shape[2:])
        output = output.view(output.shape[0:2])
        output = F.log_softmax(self.fc_final(output), dim=-1)
        return output


class VGG_13(BaseModel):
    def __init__(self, num_classes, in_channel):
        super(VGG_13, self).__init__()
        self.in_channel = in_channel
        self.bn0 = nn.BatchNorm2d(128)
        self.conv1 = ConvBlock(in_channels=in_channel, out_channels=64)
        self.conv2 = ConvBlock(in_channels=64, out_channels=128)
        self.conv3 = ConvBlock(in_channels=128, out_channels=256)
        self.conv4 = ConvBlock(in_channels=256, out_channels=512)
        self.conv5 = ConvBlock(in_channels=512, out_channels=512)
        self.fc_1 = nn.Linear(512 * 4* 10, 4096)
        self.fc_2 = nn.Linear(4096, 4096)
        self.fc_final = nn.Linear(4096, num_classes)
        self.init_weights()

    def init_weights(self):
        init_bn(self.bn0)
        init_layer(self.fc_final)
        init_layer(self.fc_1)
        init_layer(self.fc_2)

    def forward(self, input):
        # (batch_size, 3, mel_bins, time_stamps)
        B, mel_bins, num_frames = input.size()
        x = input.view(B, self.in_channel, -1, num_frames)
        x = x.transpose(1, 2)
        x = self.bn0(x)
        x = x.transpose(1,2)

        # (samples_num, channel, time_stemps, mel_bins)
        x = self.conv1(x, pool_size=(2, 2), pool_type='max')
        x = F.dropout(x, p=0.2, training=self.training)
        x = self.conv2(x, pool_size=(2, 2), pool_type='max')
        x = F.dropout(x, p=0.2, training=self.training)
        x = self.conv3(x, pool_size=(2, 2), pool_type='max')
        x = F.dropout(x, p=0.2, training = self.training)
        x = self.conv4(x, pool_size=(2, 2), pool_type='max')
        x = F.dropout(x, p=0.2, training=self.training)
        x = self.conv5(x, pool_size=(2, 2), pool_type='max')
        x = F.dropout(x, p=0.2, training=self.training)
        #output = F.max_pool2d(x, kernel_size=x.shape[2:])
        #output = output.view(output.shape[0:2])
        x = x.view(x.shape[0], -1)
        x = F.relu_(self.fc_1(x))
        x = F.dropout(x, p=0.5, training=self.training)
        x = F.relu_(self.fc_2(x))
        x = F.dropout(x, p=0.5, training=self.training)
        output = F.log_softmax(self.fc_final(x),dim=-1)
        #output = F.log_softmax(self.fc_final(output), dim=-1)
        return output

## RNN - BiLTSM

In [17]:
class BiLSTM(BaseModel):
    def __init__(self,
                 input_dim=200,
                 hidden_dim=256,
                 num_layers =2,
                 dropout=0.2,
                 num_classes=2,
                 pooling='first',
                 model='lstm',
                 BN=False):
        super(BiLSTM,self).__init__()
        if model == 'lstm':
            self.LSTM = nn.LSTM(input_size=input_dim,
                                hidden_size =hidden_dim,
                                num_layers =num_layers,
                                batch_first =True,
                                dropout=dropout,
                                bidirectional=True)
        elif model == 'gru':
            self.LSTM = nn.GRU(input_size=input_dim,
                               hidden_size =hidden_dim,
                               num_layers =num_layers,
                               batch_first =True,
                               dropout=dropout,
                               bidirectional=True)
        init_rnn(self.LSTM)
        self.BN = BN
        if self.BN:
            self.BatchNorm=nn.BatchNorm1d(hidden_dim*2)

        self.layer_out = nn.Linear(hidden_dim*2,num_classes,bias=False)
        self.pooling=pooling

    def forward(self,x):
        x = x.transpose(2,1) #[Batchsize x Time_frames x Mel_bin]
        x,_ = self.LSTM(x)
        if self.BN:
            x = self.BatchNorm(x.transpose(1,2))
            x = x.transpose(1,2)
        dim =1
        x = self.layer_out(x)  #200,2
        if self.pooling == 'avg':
            x = x.mean(dim)   #average pooling
        elif self.pooling == 'first':
            x = x.select(dim,0)  #first time step
        elif self.pooling == 'last':
            x = x.select(dim,-1)  #first time step
        elif self.pooling == 'max':
            x = x.max(dim)[0]
        elif  self.pooling == 'linear':
            (x**2).sum(dim) / x.sum(dim)
        elif self.pooling == 'exp':
            (x.exp() * x).sum(dim) / x.exp().sum(dim)

        # print(out.shape)
        # out = self.LogSoftmax(out)#the input given is expected to contain log-probabilities. Obtaining log-probabilities in a neural network is easily achieved by adding a LogSoftmax layer in the last layer of your network.
        return x#,A

## CRNN

In [18]:
class CNN_Encoder(nn.Module):
    def __init__(self, in_channels):
        super(CNN_Encoder, self).__init__()

        self.conv1 = ConvBlock(in_channels=in_channels, out_channels=32)
        self.conv2 = ConvBlock(in_channels=32, out_channels=64)
        self.conv3 = ConvBlock(in_channels=64, out_channels=128)
        self.conv4 = ConvBlock(in_channels=128, out_channels=256)
    def forward(self, input):
        x = input
        x = self.conv1(x, pool_size=(2, 2))
        x = F.dropout(x, p=0.2, training=self.training)
        x = self.conv2(x, pool_size=(2, 2))
        x = F.dropout(x, p=0.2, training=self.training)
        x = self.conv3(x, pool_size=(2, 2))
        x = F.dropout(x, p=0.2, training = self.training)
        x = self.conv4(x, pool_size=(2, 2))
        x = F.dropout(x, p=0.2, training=self.training)

        return x

class crnn(BaseModel):
    def __init__(self, in_channel,
                 hidden_dim=128,
                 num_layers = 2,
                 dropout = 0.2,
                 pooling = 'first',
                 model='lstm',
                 BN = False,
                 num_classes=2):
        super(crnn,self).__init__()

        self.in_channel = in_channel
        #self.bn0 = nn.BatchNorm2d(128)
        self.bn0 = None

        self.cnn = CNN_Encoder(in_channels=in_channel)
        self.bilstm = BiLSTM(input_dim=256,
                             hidden_dim=hidden_dim,
                             num_layers =num_layers,
                             dropout =dropout,
                             num_classes=num_classes,
                             pooling=pooling,
                             model=model,
                             BN=BN)


    def forward(self,input):
        # (batch_size, 3, mel_bins, time_stamps)
        B, mel_bins, num_frames = input.size()
        #x = input.view(B, self.in_channel, -1, num_frames)
        #x = x.transpose(1, 2)
        x = input.view(B, self.in_channel, mel_bins // self.in_channel, num_frames)

        x = self.cnn(x)
        #x = F.max_pool2d(x,kernel_size=(x.size(-2), 1)) # pool mel dimension
        x = F.adaptive_max_pool2d(x, (1, None))
        x = x.squeeze(-2)  # [B x C x Time_frames]

        output = self.bilstm(x)
        return output

# Load Model

In [19]:
from pathlib import Path

def get_model_best_path(project_path, model_name, timestamp):
    """
    Tr·∫£ v·ªÅ ƒë∆∞·ªùng d·∫´n t·ªõi model_best.pth theo c·∫•u tr√∫c chu·∫©n:
    saved/<model_dir>/models/Physionet_<model_dir>/<timestamp>/model_best.pth
    """

    if model_name not in MODEL_DIR_MAP:
        raise ValueError(f"Unsupported model name: {model_name}")

    model_dir = MODEL_DIR_MAP[model_name]

    return (
        project_path
        / "saved"
        / model_dir
        / "models"
        / f"Physionet_{model_dir}"
        / timestamp
        / "model_best.pth"
    )


In [20]:
model_path = get_model_best_path(
    project_path=PROJECT_PATH,
    model_name=MODEL_NAME,
    timestamp=TIMESTAMP
)

print(model_path)

/content/drive/MyDrive/AutomaticHeartSoundClassification-main/saved/CRNN/models/Physionet_CRNN/0108_105831/model_best.pth


In [21]:
if model_path.exists():
    size_bytes = model_path.stat().st_size

    # Convert to human-readable format
    if size_bytes < 1024:
        size_str = f"{size_bytes} Bytes"
    elif size_bytes < 1024**2:
        size_str = f"{size_bytes / 1024:.2f} KB"
    elif size_bytes < 1024**3:
        size_str = f"{size_bytes / (1024**2):.2f} MB"
    else:
        size_str = f"{size_bytes / (1024**3):.2f} GB"

    print(f"‚úÖ File exists!")
    print(f"   Size: {size_str} ({size_bytes:,} bytes)")
else:
    print("‚ùå File does NOT exist at the specified path.")
    print("   Possible reasons:")
    print("   - Training did not complete or model_best.pth was not saved")
    print("   - Wrong timestamp (check your actual training time)")
    print("   - Model saved elsewhere (e.g., in TFLITE folder)")
    print()
    print("Searching for all .pth files in the project for suggestions...")

    # Search for any .pth files in the entire project
    pth_files = sorted(PROJECT_PATH.rglob("*.pth"))
    if pth_files:
        print("\nFound .pth files:")
        for p in pth_files[:10]:  # Show max 10 files
            rel_size = p.stat().st_size / (1024**2)
            print(f"   ‚Üí {p.relative_to(PROJECT_PATH)}  ({rel_size:.2f} MB)")
        if len(pth_files) > 10:
            print(f"   ... and {len(pth_files)-10} more files")
    else:
        print("   No .pth files found in the entire project!")

‚úÖ File exists!
   Size: 58.37 MB (61,207,937 bytes)


In [22]:
MODEL_CLASS_MAP = {
    "VGG11": {
        "class": VGG_11,
        "in_channel": 3,
        "num_classes": 2
    },
    "CNN": {
        "class": simple_cnn,
        "in_channel": 3,
        "num_classes": 2
    },
    "CRNN": {
        "class": crnn,
        "in_channel": 1,
        "num_classes": 2,
        # C√°c tham s·ªë m·∫∑c ƒë·ªãnh cho CRNN (c√≥ th·ªÉ ch·ªânh n·∫øu c·∫ßn)
        "extra_kwargs": {
            "hidden_dim": 256,
            "num_layers": 2,
            "dropout": 0.2,
            "pooling": "first",
            "model": "lstm",
            "BN": False
        }
    },
    "LSTM": {
        "class": BiLSTM,
        "in_channel": None,  # Kh√¥ng d√πng in_channel
        "num_classes": 2,
        "extra_kwargs": {
            "input_dim": 128,      # Th∆∞·ªùng l√† s·ªë mel bins sau khi pool mel dim
            "hidden_dim": 256,
            "num_layers": 2,
            "dropout": 0.2,
            "pooling": "first",
            "model": "lstm",
            "BN": False
        }
    }
}

In [23]:
def build_model_for_convert(model_name: str, cfg: dict):
    ModelClass = cfg["class"]
    model_cfg = cfg["model"]
    input_cfg = cfg["input"]

    # ===== CRNN =====
    if model_name == "CRNN":
        model = ModelClass(
            in_channel=input_cfg["in_channel"],
            hidden_dim=model_cfg["hidden_dim"],
            num_layers=model_cfg["num_layers"],
            dropout=model_cfg["dropout"],
            pooling=model_cfg["pooling"],
            model=model_cfg["rnn_type"],
            BN=model_cfg["BN"],
            num_classes=model_cfg["num_classes"]
        )

    # ===== LSTM =====
    elif model_name == "LSTM":
        model = ModelClass(
            input_dim=input_cfg["mel_bins"],
            hidden_dim=model_cfg["hidden_dim"],
            num_layers=model_cfg["num_layers"],
            dropout=model_cfg["dropout"],
            num_classes=model_cfg["num_classes"]
        )

    # ===== CNN / VGG =====
    else:
        model = ModelClass(
            in_channel=input_cfg["in_channel"],
            num_classes=model_cfg["num_classes"]
        )

    model.eval()
    return model


In [61]:
# =========================
# CHECK MODEL NAME
# =========================
if MODEL_NAME not in MODEL_CLASS_MAP:
    raise ValueError(
        f"Model {MODEL_NAME} kh√¥ng ƒë∆∞·ª£c h·ªó tr·ª£. "
        f"C√°c model h·ªó tr·ª£: {list(MODEL_CLASS_MAP.keys())}"
    )

cfg = MODEL_CLASS_MAP[MODEL_NAME]
ModelClass = cfg["class"]

# =========================
# BUILD MODEL (THEO KI·∫æN TR√öC)
# =========================
if MODEL_NAME == "CRNN":
    model = ModelClass(
        in_channel=cfg["in_channel"],
        num_classes=cfg["num_classes"],
        **cfg.get("extra_kwargs", {})
    )

elif MODEL_NAME == "LSTM":
    # LSTM ·ªü ƒë√¢y l√† BiLSTM, kh√¥ng d√πng in_channel
    model = ModelClass(
        num_classes=cfg["num_classes"],
        **cfg.get("extra_kwargs", {})
    )

elif MODEL_NAME in ["VGG11", "CNN"]:
    model = ModelClass(
        in_channel=cfg["in_channel"],
        num_classes=cfg["num_classes"]
    )

else:
    raise ValueError("C·∫•u h√¨nh model kh√¥ng h·ª£p l·ªá")

model.to(device)
model.eval()

crnn(
  (cnn): CNN_Encoder(
    (conv1): ConvBlock(
      (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (conv2): ConvBlock(
      (conv1): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (conv3): ConvBlock(
      (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (conv2): Conv2d(128, 128, kernel_size=(3, 3),

In [62]:
# =========================
# LOAD WEIGHTS (FINAL ‚Äì STRICT & SAFE)
# =========================

checkpoint = torch.load(model_path, map_location=device, weights_only=False)

# ---- extract state_dict ----
if isinstance(checkpoint, dict):
    state_dict = checkpoint.get("state_dict", checkpoint.get("model", checkpoint))
else:
    state_dict = checkpoint.state_dict() if hasattr(checkpoint, "state_dict") else checkpoint

# ---- remove DataParallel prefix ----
if all(k.startswith("module.") for k in state_dict.keys()):
    from collections import OrderedDict
    state_dict = OrderedDict((k[7:], v) for k, v in state_dict.items())

# ---- remove BiLSTM BatchNorm (BN=False) ----
if MODEL_NAME in ["CRNN", "LSTM"]:
    if hasattr(model, "bilstm") and hasattr(model.bilstm, "BN") and model.bilstm.BN is False:
        bn_keys = [k for k in state_dict.keys() if "BatchNorm" in k]
        for k in bn_keys:
            del state_dict[k]

# ---- remove CRNN input BatchNorm (bn0) ----
if MODEL_NAME == "CRNN":
    bn0_keys = [k for k in state_dict.keys() if k.startswith("bn0.")]
    for k in bn0_keys:
        del state_dict[k]

# ---- load strictly ----
model.load_state_dict(state_dict, strict=True)
model.to(device)
model.eval()

print(f"‚úÖ ƒê√£ load model {MODEL_NAME} th√†nh c√¥ng t·ª´: {model_path}")


‚úÖ ƒê√£ load model CRNN th√†nh c√¥ng t·ª´: /content/drive/MyDrive/AutomaticHeartSoundClassification-main/saved/CRNN/models/Physionet_CRNN/0108_105831/model_best.pth


# Punning

In [63]:
import torch.nn.utils.prune as prune

parameters_to_prune = []
for name, module in model.named_modules():
    if isinstance(module, (torch.nn.Conv2d, torch.nn.Linear)):
        parameters_to_prune.append((module, 'weight'))
        # Optional: prune bias too
        # if module.bias is not None:
        #     parameters_to_prune.append((module, 'bias'))

prune.global_unstructured(
    parameters_to_prune,
    pruning_method=prune.L1Unstructured,
    amount=0.5,  # 50% weights
)

print("‚úÖ ƒê√£ apply pruning (v·ªõi mask)")

‚úÖ ƒê√£ apply pruning (v·ªõi mask)


In [64]:
features_file = DATASET_PATH / "logmel_features.h5"
label_file = DATASET_PATH / "label.csv"

In [65]:
import librosa
import random
import numpy as np
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

EPS = 1e-8

def standard_normal_variate(data):
    """Z-score normalization tr√™n to√†n b·ªô feature"""
    mean = np.mean(data)
    std = np.std(data) + EPS
    return (data - mean) / std

class LogMelDataset(Dataset):
    def __init__(self, features_h5_path, label_csv_path, keys=None,
                 delta=True, norm=True, duration=5, hop_length=15, training=True):
        self.features_h5 = h5py.File(features_h5_path, 'r')
        self.labels_df = pd.read_csv(label_csv_path)

        if keys is None:
            self.keys = list(self.features_h5.keys())
        else:
            self.keys = keys

        # Map key (filename stem) ‚Üí label
        self.key_to_label = dict(zip(self.labels_df.iloc[:, 0].astype(str), self.labels_df.iloc[:, 1]))

        self.delta = delta
        self.norm = norm
        self.duration = duration
        self.hop_length = hop_length
        self.training = training

        # ƒê·ªô d√†i c·ªë ƒë·ªãnh: 5 gi√¢y v·ªõi hop 15ms ‚Üí 5000 / 15 = 333.333 ‚Üí l·∫•y 333 frames
        self.fixed_length = int(self.duration * 1000 / self.hop_length)  # 333

    def __len__(self):
        return len(self.keys)

    def __getitem__(self, idx):
        key = self.keys[idx]
        feature = self.features_h5[key][()]  # shape: (mel_bins, time_frames), th∆∞·ªùng l√† (128, T)

        # Chu·∫©n h√≥a Z-score n·∫øu b·∫≠t
        if self.norm:
            feature = standard_normal_variate(feature)

        # ƒê·∫£m b·∫£o lu√¥n l√† 3D: (C, mel_bins, T)
        if feature.ndim == 2:
            feature = feature[np.newaxis, :, :]  # (1, 128, T)

        channels, mel_bins, num_frames = feature.shape

        # Th√™m delta + delta-delta n·∫øu b·∫≠t
        if self.delta:
            orig = feature[0]  # (128, T)
            delta1 = librosa.feature.delta(orig)
            delta2 = librosa.feature.delta(delta1)
            feature = np.stack([orig, delta1, delta2], axis=0)  # (3, 128, T)
            channels = 3

        # Crop ho·∫∑c pad ƒë·ªÉ ƒë·∫°t ƒë√∫ng fixed_length (333 frames)
        if num_frames >= self.fixed_length:
            if self.training:
                start = random.randint(0, num_frames - self.fixed_length)
            else:
                start = (num_frames - self.fixed_length) // 2
            feature = feature[:, :, start:start + self.fixed_length]
        else:
            pad_width = self.fixed_length - num_frames
            feature = np.pad(feature, ((0, 0), (0, 0), (0, pad_width)), mode='wrap')

        # B√¢y gi·ªù feature ch·∫Øc ch·∫Øn l√† (channels, 128, 333)
        feature_tensor = torch.from_numpy(feature).float()

        # Quan tr·ªçng: reshape ƒë·ªÉ ph√π h·ª£p v·ªõi model VGG_11 g·ªëc
        if channels > 1:  # delta=True ‚Üí gh√©p 3 channel th√†nh 384 mel bins
            feature_tensor = feature_tensor.view(channels * mel_bins, self.fixed_length)  # (384, 333)
        else:
            feature_tensor = feature_tensor.squeeze(0)  # (128, 333)

        label = torch.tensor(self.key_to_label.get(key, 0), dtype=torch.long)
        return feature_tensor, label

    def close(self):
        if hasattr(self, 'features_h5'):
            self.features_h5.close()

In [66]:
# ================== CHIA TRAIN/VAL KEYS ==================
# Load to√†n b·ªô keys t·ª´ file .h5
with h5py.File(features_file, 'r') as hf:
    all_keys = list(hf.keys())

# Load labels ƒë·ªÉ stratify (gi·ªØ t·ª∑ l·ªá l·ªõp khi chia train/val)
labels_all = pd.read_csv(label_file).iloc[:, 1].values  # c·ªôt label (th∆∞·ªùng l√† c·ªôt th·ª© 2)

# Chia 80/20, gi·ªØ nguy√™n t·ª∑ l·ªá nh√£n (r·∫•t quan tr·ªçng cho dataset imbalance nh∆∞ PhysioNet)
train_keys, val_keys = train_test_split(
    all_keys,
    test_size=0.2,
    random_state=42,
    stratify=labels_all
)

print(f"Total samples: {len(all_keys)}")
print(f"Train samples: {len(train_keys)}, Val samples: {len(val_keys)}")

Total samples: 3240
Train samples: 2592, Val samples: 648


In [67]:
train_dataset = LogMelDataset(
    features_file,
    label_file,
    keys=train_keys,
    delta=True,      # ph·∫£i b·∫≠t v√¨ model train v·ªõi delta
    norm=True,
    duration=5,
    hop_length=15,
    training=True
)

val_dataset = LogMelDataset(
    features_file,
    label_file,
    keys=val_keys,
    delta=True,
    norm=True,
    duration=5,
    hop_length=15,
    training=False
)

train_loader = DataLoader(
    train_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=2,        # gi·∫£m xu·ªëng 2 ƒë·ªÉ tr√°nh warning
    pin_memory=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=32,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

print(f"Train samples: {len(train_dataset)}, Val samples: {len(val_dataset)}")
print("M·ªói sample c√≥ shape c·ªë ƒë·ªãnh ‚Üí kh√¥ng c·∫ßn padding trong collate")

Train samples: 2592, Val samples: 648
M·ªói sample c√≥ shape c·ªë ƒë·ªãnh ‚Üí kh√¥ng c·∫ßn padding trong collate


In [68]:
'''
model.train()
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-5)
criterion = torch.nn.NLLLoss()  # v√¨ model output log_softmax

num_finetune_epochs = 10

for epoch in range(num_finetune_epochs):
    model.train()
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    avg_loss = running_loss / len(train_loader)
    print(f"Fine-tune Epoch {epoch+1}/{num_finetune_epochs} - Avg Loss: {avg_loss:.6f}")

    # ƒê√°nh gi√° tr√™n validation (t√πy ch·ªçn)
    if (epoch + 1) % 5 == 0 or epoch == num_finetune_epochs - 1:
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                pred = torch.argmax(outputs, dim=1)
                correct += (pred == labels).sum().item()
                total += labels.size(0)
        acc = correct / total if total > 0 else 0
        print(f"   >>> Val Accuracy sau epoch {epoch+1}: {acc:.4f}")
        model.train()

# ƒê√≥ng dataset ƒë·ªÉ gi·∫£i ph√≥ng file handle
train_dataset.close()
val_dataset.close()

model.eval()
print("Fine-tune sau pruning ho√†n t·∫•t")
'''

'\nmodel.train()\noptimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-5)\ncriterion = torch.nn.NLLLoss()  # v√¨ model output log_softmax\n\nnum_finetune_epochs = 10\n\nfor epoch in range(num_finetune_epochs):\n    model.train()\n    running_loss = 0.0\n\n    for inputs, labels in train_loader:\n        inputs, labels = inputs.to(device), labels.to(device)\n\n        optimizer.zero_grad()\n        outputs = model(inputs)\n        loss = criterion(outputs, labels)\n        loss.backward()\n        optimizer.step()\n\n        running_loss += loss.item()\n\n    avg_loss = running_loss / len(train_loader)\n    print(f"Fine-tune Epoch {epoch+1}/{num_finetune_epochs} - Avg Loss: {avg_loss:.6f}")\n\n    # ƒê√°nh gi√° tr√™n validation (t√πy ch·ªçn)\n    if (epoch + 1) % 5 == 0 or epoch == num_finetune_epochs - 1:\n        model.eval()\n        correct = 0\n        total = 0\n        with torch.no_grad():\n            for inputs, labels in val_loader:\n     

In [69]:
# ================== L√ÄM PRUNING PERMANENT ==================
import torch.nn.utils.prune as prune  # n·∫øu ch∆∞a import

for module, name in parameters_to_prune:
    prune.remove(module, name)

print("‚úÖ ƒê√£ remove pruning mask ‚Üí pruning permanent")

# ================== L∆ØU MODEL PRUNED ==================
PRUNED_PATH = PROJECT_PATH / "TFLITE" / f"{MODEL_NAME.lower()}_{TIMESTAMP}_pruned_finetuned.pth"
torch.save(model.state_dict(), PRUNED_PATH)
print(f"‚úÖ Model pruned + finetuned l∆∞u t·∫°i: {PRUNED_PATH}")

# ƒê√≥ng h5 file
train_dataset.close()

‚úÖ ƒê√£ remove pruning mask ‚Üí pruning permanent
‚úÖ Model pruned + finetuned l∆∞u t·∫°i: /content/drive/MyDrive/AutomaticHeartSoundClassification-main/TFLITE/crnn_0108_105831_pruned_finetuned.pth


## Define input shape & Dummy input

In [70]:
# ================== DEFINE INPUT SHAPE & DUMMY INPUT ==================
# Input shape c·ªë ƒë·ªãnh sau khi fix dataset: (batch, 384, 333)
dummy_input = torch.randn(1, 384, 333).to(device)

# Chuy·ªÉn sang ONXX

In [71]:
# Export
ONNX_PATH = PROJECT_PATH / "TFLITE" / f"{MODEL_NAME.lower()}_{TIMESTAMP}_final.onnx"
ONNX_PATH.parent.mkdir(parents=True, exist_ok=True)
print(ONNX_PATH)

/content/drive/MyDrive/AutomaticHeartSoundClassification-main/TFLITE/crnn_0108_105831_final.onnx


In [78]:
print("üîß Flattening LSTM parameters...")

if hasattr(model, 'bilstm') and hasattr(model.bilstm, 'LSTM'):
    model.bilstm.LSTM.flatten_parameters()
    print("‚úÖ LSTM flattened\n")
else:
    print("‚ö†Ô∏è  No LSTM found\n")

üîß Flattening LSTM parameters...
‚úÖ LSTM flattened



In [79]:
original_forward = model.forward

def export_forward_simple_cnn(input):
    B, mel_bins, num_frames = input.size()

    x = input.view(B, model.in_channel, -1, num_frames)

    x = F.relu(model.bn1(model.conv1(x)))
    x = F.max_pool2d(x, 2)

    x = F.relu(model.bn2(model.conv2(x)))
    x = F.max_pool2d(x, 2)

    x = F.relu(model.bn3(model.conv3(x)))
    x = F.max_pool2d(x, 2)

    x = model.pool(x)
    x = x.view(B, -1)

    x = F.relu(model.fc1(x))
    output = model.output(x)   # Softmax c√≥ th·ªÉ b·ªè ·ªü edge
    return output

def export_forward_vgg(input):
    B, mel_bins, num_frames = input.size()
    x = input.view(B, model.in_channel, -1, num_frames)
    x = x.transpose(1, 2)
    x = model.bn0(x)
    x = x.transpose(1, 2)

    x = model.conv1(x, pool_size=(2, 2), pool_type='max')

    x = nn.AdaptiveMaxPool2d((1, 1))(x)
    x = x.view(B, -1)
    output = model.fc_final(x)
    return output

def export_forward_crnn(input):
    """
    ONNX-compatible forward pass
    Uses fixed kernel size for pooling instead of adaptive pooling
    """
    B, mel_bins, num_frames = input.size()

    # Reshape
    x = input.view(B, model.in_channel, mel_bins // model.in_channel, num_frames)

    # CNN encoder - this includes 4 ConvBlocks with pooling
    x = model.cnn(x)

    # Pool frequency dimension using FIXED kernel size
    # This replaces F.adaptive_max_pool2d(x, (1, None))
    x = torch.nn.functional.max_pool2d(
        x,
        kernel_size=(freq_dim_after_cnn, 1)  # Use calculated size
    )
    x = x.squeeze(2)  # Remove frequency dimension

    # BiLSTM
    output = model.bilstm(x)

    return output



if MODEL_NAME == "CNN":
    model.forward = export_forward_simple_cnn
elif MODEL_NAME == "VGG11":
    model.forward = export_forward_vgg
elif MODEL_NAME == "CRNN":
    model.forward = export_forward_crnn
else:
    raise ValueError("Unsupported model")

In [36]:
'''
try:
    torch.onnx.export(
        model,
        dummy_input,
        str(ONNX_PATH),
        export_params=True,
        opset_version=17,
        do_constant_folding=True,
        input_names=["input"],
        output_names=["output"],
        dynamic_axes={
            "input": {0: "batch_size"},
            "output": {0: "batch_size"}
        },
        dynamo=False
        # verbose=False
    )
    print(f"‚úÖ Export ONNX th√†nh c√¥ng: {ONNX_PATH}")
except Exception as e:
    print("L·ªói export:", e)
finally:
    model.forward = original_forward
    print("ƒê√£ kh√¥i ph·ª•c forward g·ªëc")

# Ki·ªÉm tra file
import os
if os.path.exists(ONNX_PATH):
    print(f"Size: {os.path.getsize(ONNX_PATH) / (1024**2):.2f} MB")
'''

‚úÖ Export ONNX th√†nh c√¥ng: /content/drive/MyDrive/AutomaticHeartSoundClassification-main/TFLITE/crnn_0108_105831_final.onnx
ƒê√£ kh√¥i ph·ª•c forward g·ªëc
Size: 14.52 MB


In [80]:
try:
    print(f"üì¶ Exporting to: {ONNX_PATH.name}\n")

    torch.onnx.export(
        model,
        dummy_input,
        str(ONNX_PATH),

        export_params=True,
        opset_version=17,
        do_constant_folding=True,

        input_names=["input"],
        output_names=["output"],

        dynamic_axes={
            "input": {0: "batch_size"},
            "output": {0: "batch_size"}
        },

        dynamo=False,
        training=torch.onnx.TrainingMode.EVAL,
        keep_initializers_as_inputs=False,
        verbose=False
    )

    print(f"‚úÖ ONNX export successful!\n")

except Exception as e:
    print(f"‚ùå Export failed: {e}\n")
    import traceback
    traceback.print_exc()
    raise

finally:
    model.forward = original_forward
    print("‚úÖ Restored original forward\n")

üì¶ Exporting to: crnn_0108_105831_final.onnx

‚úÖ ONNX export successful!

‚úÖ Restored original forward



In [81]:
import os
if os.path.exists(ONNX_PATH):
    file_size_mb = os.path.getsize(ONNX_PATH) / (1024**2)
    print(f"File size: {file_size_mb:.2f} MB")

    # Load and verify with ONNX
    import onnx
    try:
        onnx_model = onnx.load(str(ONNX_PATH))
        onnx.checker.check_model(onnx_model)
        print("ONNX model is valid")

        # Check LSTM nodes
        lstm_nodes = [n for n in onnx_model.graph.node if n.op_type == "LSTM"]
        print(f"\nFound {len(lstm_nodes)} LSTM node(s)")

        for i, lstm_node in enumerate(lstm_nodes):
            print(f"\nLSTM #{i+1}: {lstm_node.name}")
            print(f"  Number of inputs: {len(lstm_node.input)}")
            for j, inp in enumerate(lstm_node.input):
                if inp:  # Only print non-empty inputs
                    print(f"    [{j}] {inp}")

        # Verify with ONNX Runtime
        try:
            import onnxruntime as ort
            sess = ort.InferenceSession(str(ONNX_PATH), providers=['CPUExecutionProvider'])

            # Test inference
            test_input = dummy_input.cpu().numpy()
            ort_output = sess.run(None, {'input': test_input})[0]

            # Compare with PyTorch
            with torch.no_grad():
                torch_output = model(dummy_input).cpu().numpy()

            max_diff = abs(torch_output - ort_output).max()
            print(f"\nüîç Output comparison:")
            print(f"  PyTorch output shape: {torch_output.shape}")
            print(f"  ONNX output shape: {ort_output.shape}")
            print(f"  Max difference: {max_diff:.6f}")

            if max_diff < 1e-4:
                print("Outputs match! ONNX export is correct.")
            else:
                print(f" Outputs differ by {max_diff}")

        except ImportError:
            print("\nonnxruntime not installed, skipping runtime verification")
        except Exception as e:
            print(f"\n ONNX Runtime verification failed: {e}")

    except Exception as e:
        print(f"ONNX verification failed: {e}")
        import traceback
        traceback.print_exc()
else:
    print("ONNX file not found!")

print("\n" + "="*60)
print("ONNX EXPORT COMPLETED")

File size: 14.52 MB
ONNX model is valid

Found 2 LSTM node(s)

LSTM #1: /bilstm/LSTM/LSTM
  Number of inputs: 7
    [0] /bilstm/LSTM/Transpose_output_0
    [1] onnx::LSTM_487
    [2] onnx::LSTM_488
    [3] onnx::LSTM_486
    [5] /bilstm/LSTM/Slice_output_0
    [6] /bilstm/LSTM/Slice_1_output_0

LSTM #2: /bilstm/LSTM/LSTM_1
  Number of inputs: 7
    [0] /bilstm/LSTM/Reshape_output_0
    [1] onnx::LSTM_530
    [2] onnx::LSTM_531
    [3] onnx::LSTM_529
    [5] /bilstm/LSTM/Slice_2_output_0
    [6] /bilstm/LSTM/Slice_3_output_0

üîç Output comparison:
  PyTorch output shape: (1, 2)
  ONNX output shape: (1, 2)
  Max difference: 0.000000
Outputs match! ONNX export is correct.

ONNX EXPORT COMPLETED


In [82]:
DATASET_PATH = Path('/content/drive/MyDrive/AutomaticHeartSoundClassification-main/data')
sys.path.append(str(DATASET_PATH))

features_file = DATASET_PATH / "logmel_features.h5"
label_file = DATASET_PATH / "label.csv"

# Load labels ƒë·ªÉ bi·∫øt s·ªë sample
labels = pd.read_csv(label_file)
num_samples = len(labels)

# Load m·ªôt ph·∫ßn features l√†m representative dataset (100-300 samples l√† ƒë·ªß)
def representative_data_gen():
    with h5py.File(features_file, 'r') as hf:
        for i in range(300):  # D√πng 300 samples ƒë·ªÉ calibrate
            idx = i % num_samples
            key = list(hf.keys())[idx]  # Ho·∫∑c hf['features'][idx] n·∫øu l√† array
            data = hf[key][()]
            data = np.expand_dims(data, axis=0)  # Add batch dim
            data = data.astype(np.float32)
            yield [data]

print("‚úÖ ƒê√£ chu·∫©n b·ªã representative dataset t·ª´ logmel_features.h5")

‚úÖ ƒê√£ chu·∫©n b·ªã representative dataset t·ª´ logmel_features.h5


# CONVERT ONNX -> TFLITE

## ONXX

In [83]:
import onnx
m = onnx.load(ONNX_PATH)
print("IR version:", m.ir_version)

IR version: 8


In [84]:
import onnx

from onnxsim import simplify

model = onnx.load(ONNX_PATH)
model_simp, check = simplify(model)

onnx.save(model_simp, f"{MODEL_NAME.lower()}_{TIMESTAMP}_final.onnx")

In [85]:
import onnxruntime as ort
try:
    # Th·ª≠ n·∫°p model b·∫±ng ONNX Runtime
    session = ort.InferenceSession(str(ONNX_PATH))
    print("‚úÖ Model ONNX h·ª£p l·ªá v√† s·∫µn s√†ng chuy·ªÉn ƒë·ªïi TFLite.")
except Exception as e:
    print(f"‚ùå Model v·∫´n l·ªói c·∫•u tr√∫c: {e}")

‚úÖ Model ONNX h·ª£p l·ªá v√† s·∫µn s√†ng chuy·ªÉn ƒë·ªïi TFLite.


In [86]:
import h5py
import numpy as np
import librosa
from pathlib import Path

PROJECT_PATH = Path('/content/drive/MyDrive/AutomaticHeartSoundClassification-main')
DATASET_PATH = PROJECT_PATH / "data"
features_file = DATASET_PATH / "logmel_features.h5"

DURATION = 5
HOP_LENGTH_MS = 15
FIXED_LENGTH = int(DURATION * 1000 / HOP_LENGTH_MS)  # 333
DELTA = True
NORM = True

def standard_normal_variate(data):
    mean = np.mean(data)
    std = np.std(data) + 1e-8
    return (data - mean) / std

calib_data_list = []

with h5py.File(features_file, 'r') as hf:
    all_keys = list(hf.keys())
    num_samples = len(all_keys)
    print(f"Total samples: {num_samples}")

    for i in range(300):
        key = all_keys[i % num_samples]
        feature = hf[key][()]

        if NORM:
            feature = standard_normal_variate(feature)

        if feature.ndim == 2:
            feature = feature[np.newaxis, :, :]

        if DELTA:
            orig = feature[0]
            delta1 = librosa.feature.delta(orig)
            delta2 = librosa.feature.delta(delta1)
            feature = np.stack([orig, delta1, delta2], axis=0)

        channels, mel_bins, num_frames = feature.shape

        if num_frames >= FIXED_LENGTH:
            start = num_frames // 2 - FIXED_LENGTH // 2
            feature = feature[:, :, start:start + FIXED_LENGTH]
        else:
            pad_width = FIXED_LENGTH - num_frames
            feature = np.pad(feature, ((0,0), (0,0), (0, pad_width)), mode='wrap')

        if channels > 1:
            feature = feature.reshape(channels * mel_bins, FIXED_LENGTH)  # (384, 333)

        feature = feature.astype(np.float32)
        feature = np.expand_dims(feature, axis=0)  # (1, 384, 333)
        calib_data_list.append(feature)

        if (i + 1) % 50 == 0:
            print(f"Prepared {i + 1}/300 calibration samples")

# Gh√©p th√†nh array (300, 1, 384, 333) r·ªìi l∆∞u .npy
calibration_data = np.concatenate(calib_data_list, axis=0)
np.save("calib_data.npy", calibration_data)
print("‚úÖ ƒê√£ l∆∞u file calib_data.npy (shape:", calibration_data.shape, ")")

Total samples: 3240
Prepared 50/300 calibration samples
Prepared 100/300 calibration samples
Prepared 150/300 calibration samples
Prepared 200/300 calibration samples
Prepared 250/300 calibration samples
Prepared 300/300 calibration samples
‚úÖ ƒê√£ l∆∞u file calib_data.npy (shape: (300, 384, 333) )


In [42]:
import onnx
from pathlib import Path

PROJECT_PATH = Path('/content/drive/MyDrive/AutomaticHeartSoundClassification-main')
TFLITE_DIR = PROJECT_PATH / "TFLITE"

model_prefix = f"{MODEL_NAME.lower()}_{TIMESTAMP}"

onnx_files = (
    list(TFLITE_DIR.glob(f"{model_prefix}_final.onnx")) +
    list(TFLITE_DIR.glob(f"{model_prefix}.onnx"))
)

if not onnx_files:
    raise FileNotFoundError(f"Kh√¥ng t√¨m th·∫•y ONNX cho {model_prefix}")

# L·∫•y file m·ªõi nh·∫•t (ph√≤ng tr∆∞·ªùng h·ª£p export nhi·ªÅu l·∫ßn)
onnx_path = max(onnx_files, key=lambda p: p.stat().st_mtime)

print(f"ƒêang d√πng ONNX: {onnx_path.name}")

model = onnx.load(str(onnx_path))
input_name = model.graph.input[0].name
print(f"T√™n input OP trong ONNX: '{input_name}'")

ƒêang d√πng ONNX: crnn_0108_105831_final.onnx
T√™n input OP trong ONNX: 'input'


In [87]:
from pathlib import Path
import onnx2tf

# =========================
# 1. PATH CONFIGURATION
# =========================

OUTPUT_TFLITE_DIR = TFLITE_DIR / f"{MODEL_NAME.lower()}_tflite_int8_{TIMESTAMP}"
output_folder_path=str(OUTPUT_TFLITE_DIR),

onnx_files = (
    list(TFLITE_DIR.glob(f"{MODEL_NAME.lower()}_{TIMESTAMP}_final.onnx")) +
    list(TFLITE_DIR.glob(f"{MODEL_NAME.lower()}_{TIMESTAMP}.onnx"))
)

if not onnx_files:
    raise FileNotFoundError(f"Kh√¥ng t√¨m th·∫•y file ONNX trong {TFLITE_DIR}")

onnx_path = sorted(
    onnx_files,
    key=lambda p: p.stat().st_mtime,
    reverse=True
)[0]

print(f"ƒêang d√πng ONNX: {onnx_path.name}")

ƒêang d√πng ONNX: crnn_0108_105831_final.onnx


In [88]:
json_path = OUTPUT_TFLITE_DIR / f"{onnx_path.stem}_auto.json"

print(f"Ki·ªÉm tra file c·∫•u h√¨nh: {json_path}")

Ki·ªÉm tra file c·∫•u h√¨nh: /content/drive/MyDrive/AutomaticHeartSoundClassification-main/TFLITE/crnn_tflite_int8_0108_105831/crnn_0108_105831_final_auto.json


In [45]:

# 2. ONNX ‚Üí TFLITE INT8
# CNN and VGG
'''
onnx2tf.convert(
    input_onnx_file_path=str(onnx_path),
    output_folder_path=(OUTPUT_TFLITE_DIR),

    param_replacement_file=str(json_path) if json_path.exists() else None,

    # --- TFLite settings ---
    copy_onnx_input_output_names_to_tflite=True,
    output_integer_quantized_tflite=True,
    quant_type="per-tensor",   # c√≥ th·ªÉ ƒë·ªïi sang "per-channel"

    # --- FIX SHAPE (R·∫§T QUAN TR·ªåNG) ---
    batch_size=1,
    overwrite_input_shape=[
        "input:1,384,333"   # (B, C*Mel, Time)
    ],

    # --- INT8 CALIBRATION DATA ---
    custom_input_op_name_np_data_path=[
        [
            "input",            # t√™n input trong ONNX
            "calib_data.npy",   # (300, 384, 333)
            0.0,                # mean
            1.0                 # std
        ]
    ],

    non_verbose=False
)

print("Chuy·ªÉn ƒë·ªïi th√†nh c√¥ng sang TFLite INT8!")
print("Output n·∫±m trong th∆∞ m·ª•c: vgg11_tflite_int8_final/")
print(
    f"Output n·∫±m trong th∆∞ m·ª•c: "
    f"{MODEL_NAME.lower()}_tflite_int8_{TIMESTAMP}/"
)
'''

'\nonnx2tf.convert(\n    input_onnx_file_path=str(onnx_path),\n    output_folder_path=(OUTPUT_TFLITE_DIR),\n\n    param_replacement_file=str(json_path) if json_path.exists() else None,\n\n    # --- TFLite settings ---\n    copy_onnx_input_output_names_to_tflite=True,\n    output_integer_quantized_tflite=True,\n    quant_type="per-tensor",   # c√≥ th·ªÉ ƒë·ªïi sang "per-channel"\n\n    # --- FIX SHAPE (R·∫§T QUAN TR·ªåNG) ---\n    batch_size=1,\n    overwrite_input_shape=[\n        "input:1,384,333"   # (B, C*Mel, Time)\n    ],\n\n    # --- INT8 CALIBRATION DATA ---\n    custom_input_op_name_np_data_path=[\n        [\n            "input",            # t√™n input trong ONNX\n            "calib_data.npy",   # (300, 384, 333)\n            0.0,                # mean\n            1.0                 # std\n        ]\n    ],\n\n    non_verbose=False\n)\n\nprint("Chuy·ªÉn ƒë·ªïi th√†nh c√¥ng sang TFLite INT8!")\nprint("Output n·∫±m trong th∆∞ m·ª•c: vgg11_tflite_int8_final/")\nprint(\n    f"Ou

In [91]:
import onnx
from pathlib import Path

# ============================================================
# LOAD ONNX MODEL
# ============================================================
ONNX_PATH = PROJECT_PATH / "TFLITE" / f"{MODEL_NAME.lower()}_{TIMESTAMP}_final.onnx"
MODIFIED_ONNX_PATH = PROJECT_PATH / "TFLITE" / f"{MODEL_NAME.lower()}_{TIMESTAMP}_no_init_state.onnx"

print("="*60)
print("REMOVING LSTM INITIAL STATES FROM ONNX")
print("="*60)
print(f"\nLoading: {ONNX_PATH.name}")

model = onnx.load(str(ONNX_PATH))

# ============================================================
# FIND AND MODIFY LSTM NODES
# ============================================================
lstm_nodes_found = 0
lstm_nodes_modified = 0

for node in model.graph.node:
    if node.op_type == "LSTM":
        lstm_nodes_found += 1

        print(f"\nüìç Found LSTM: {node.name}")
        print(f"   Inputs before: {len(node.input)}")

        # Show all inputs
        for i, inp in enumerate(node.input):
            if inp:
                print(f"     [{i}] {inp}")
            else:
                print(f"     [{i}] (empty)")

        # LSTM standard inputs are:
        # [0] X - input tensor
        # [1] W - weight tensor
        # [2] R - recurrence weight tensor
        # [3] B - bias tensor
        # [4] sequence_lens - (optional, can be empty)
        # [5] initial_h - (REMOVE THIS)
        # [6] initial_c - (REMOVE THIS)

        # Keep only first 5 inputs (X, W, R, B, sequence_lens)
        original_inputs = list(node.input)

        # If there are more than 5 inputs, remove the extra ones
        if len(original_inputs) > 5:
            # Keep inputs 0-4, remove 5 and 6 (initial states)
            node.input[:] = original_inputs[:5]
            lstm_nodes_modified += 1

            print(f"   ‚úÖ Modified to {len(node.input)} inputs")
            print(f"   Removed inputs: {original_inputs[5:]}")
        else:
            print(f"   ‚ÑπÔ∏è  Already has {len(node.input)} inputs (no modification needed)")

print(f"\nüìä Summary:")
print(f"   LSTM nodes found: {lstm_nodes_found}")
print(f"   LSTM nodes modified: {lstm_nodes_modified}")

# ============================================================
# SAVE MODIFIED ONNX
# ============================================================
print(f"\nüíæ Saving modified model to: {MODIFIED_ONNX_PATH.name}")

onnx.save(model, str(MODIFIED_ONNX_PATH))

# Verify the saved model
try:
    onnx.checker.check_model(model)
    print("‚úÖ Modified ONNX model is valid\n")
except Exception as e:
    print(f"‚ùå Model validation failed: {e}\n")

# ============================================================
# VERIFY MODIFICATIONS
# ============================================================
print("="*60)
print("VERIFICATION")
print("="*60)

# Reload and check
model_verify = onnx.load(str(MODIFIED_ONNX_PATH))

for node in model_verify.graph.node:
    if node.op_type == "LSTM":
        num_inputs = len([inp for inp in node.input if inp])
        print(f"\nLSTM: {node.name}")
        print(f"  Total inputs: {num_inputs}")

        for i, inp in enumerate(node.input):
            if inp:
                print(f"    [{i}] {inp}")

        if num_inputs <= 5:
            print(f"  ‚úÖ SUCCESS! No initial states")
        else:
            print(f"  ‚ö†Ô∏è  Still has {num_inputs} inputs")

# ============================================================
# TEST WITH ONNX RUNTIME
# ============================================================
try:
    import onnxruntime as ort

    print("\nüß™ Testing with ONNX Runtime...")

    sess = ort.InferenceSession(
        str(MODIFIED_ONNX_PATH),
        providers=['CPUExecutionProvider']
    )

    # Test inference
    import numpy as np
    test_input = np.random.randn(1, 384, 333).astype(np.float32)
    output = sess.run(None, {'input': test_input})[0]

    print(f"‚úÖ ONNX Runtime test successful")
    print(f"   Input shape: {test_input.shape}")
    print(f"   Output shape: {output.shape}")

except ImportError:
    print("\n‚ö†Ô∏è  onnxruntime not installed, skipping runtime test")
except Exception as e:
    print(f"\n‚ùå ONNX Runtime test failed: {e}")
    import traceback
    traceback.print_exc()

print("\n" + "="*60)
print("‚úÖ ONNX MODIFICATION COMPLETE")
print("="*60)
print(f"\nUse this file for TFLite conversion:")
print(f"  {MODIFIED_ONNX_PATH.name}")

REMOVING LSTM INITIAL STATES FROM ONNX

Loading: crnn_0108_105831_final.onnx

üìç Found LSTM: /bilstm/LSTM/LSTM
   Inputs before: 7
     [0] /bilstm/LSTM/Transpose_output_0
     [1] onnx::LSTM_487
     [2] onnx::LSTM_488
     [3] onnx::LSTM_486
     [4] (empty)
     [5] /bilstm/LSTM/Slice_output_0
     [6] /bilstm/LSTM/Slice_output_0
   ‚úÖ Modified to 5 inputs
   Removed inputs: ['/bilstm/LSTM/Slice_output_0', '/bilstm/LSTM/Slice_output_0']

üìç Found LSTM: /bilstm/LSTM/LSTM_1
   Inputs before: 7
     [0] /bilstm/LSTM/Reshape_output_0
     [1] onnx::LSTM_530
     [2] onnx::LSTM_531
     [3] onnx::LSTM_529
     [4] (empty)
     [5] /bilstm/LSTM/Slice_output_0
     [6] /bilstm/LSTM/Slice_output_0
   ‚úÖ Modified to 5 inputs
   Removed inputs: ['/bilstm/LSTM/Slice_output_0', '/bilstm/LSTM/Slice_output_0']

üìä Summary:
   LSTM nodes found: 2
   LSTM nodes modified: 2

üíæ Saving modified model to: crnn_0108_105831_no_init_state.onnx
‚úÖ Modified ONNX model is valid

VERIFICATION

LST

In [95]:
import onnx
from pathlib import Path

ONNX_PATH = PROJECT_PATH / "TFLITE" / f"{MODEL_NAME.lower()}_{TIMESTAMP}_final.onnx"
MODIFIED_ONNX_PATH = PROJECT_PATH / "TFLITE" / f"{MODEL_NAME.lower()}_{TIMESTAMP}_no_init_state.onnx"

print("ƒêang x√≥a initial states t·ª´ LSTM...\n")

model = onnx.load(str(ONNX_PATH))

for node in model.graph.node:
    if node.op_type == "LSTM":
        print(f"LSTM: {node.name}")
        print(f"  Inputs tr∆∞·ªõc: {len(node.input)}")

        original_inputs = list(node.input)
        if len(original_inputs) > 5:
            node.input[:] = original_inputs[:5]
            print(f"  Inputs sau: {len(node.input)}")
            print(f"  ‚úÖ ƒê√£ x√≥a: {original_inputs[5:]}\n")

onnx.save(model, str(MODIFIED_ONNX_PATH))
print(f"‚úÖ L∆∞u: {MODIFIED_ONNX_PATH.name}")

ƒêang x√≥a initial states t·ª´ LSTM...

LSTM: /bilstm/LSTM/LSTM
  Inputs tr∆∞·ªõc: 7
  Inputs sau: 5
  ‚úÖ ƒê√£ x√≥a: ['/bilstm/LSTM/Slice_output_0', '/bilstm/LSTM/Slice_output_0']

LSTM: /bilstm/LSTM/LSTM_1
  Inputs tr∆∞·ªõc: 7
  Inputs sau: 5
  ‚úÖ ƒê√£ x√≥a: ['/bilstm/LSTM/Slice_output_0', '/bilstm/LSTM/Slice_output_0']

‚úÖ L∆∞u: crnn_0108_105831_no_init_state.onnx


In [101]:
onnx2tf.convert(
    input_onnx_file_path=str(MODIFIED_ONNX_PATH),
    output_folder_path=(OUTPUT_TFLITE_DIR),

    param_replacement_file=str(json_path) if json_path.exists() else None,

    # --- TFLite settings ---
    copy_onnx_input_output_names_to_tflite=True,
    output_integer_quantized_tflite=True,
    quant_type="per-tensor",   # c√≥ th·ªÉ ƒë·ªïi sang "per-channel"

    # --- FIX SHAPE (R·∫§T QUAN TR·ªåNG) ---
    batch_size=1,
    overwrite_input_shape=[
        "input:1,384,333"   # (B, C*Mel, Time)
    ],

    # --- INT8 CALIBRATION DATA ---
    custom_input_op_name_np_data_path=[
        [
            "input",            # t√™n input trong ONNX
            "calib_data.npy",   # (300, 384, 333)
            0.0,                # mean
            1.0                 # std
        ]
    ],

    non_verbose=False
)

print("Chuy·ªÉn ƒë·ªïi th√†nh c√¥ng sang TFLite INT8!")
print("Output n·∫±m trong th∆∞ m·ª•c: vgg11_tflite_int8_final/")
print(
    f"Output n·∫±m trong th∆∞ m·ª•c: "
    f"{MODEL_NAME.lower()}_tflite_int8_{TIMESTAMP}/"
)


Simplifying...
Finish! Here is the difference:
‚îè‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚î≥‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚î≥‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îì
‚îÉ            ‚îÉ Original Model ‚îÉ Simplified Model ‚îÉ
‚î°‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚ïá‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚ïá‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚î©
‚îÇ Constant   ‚îÇ 27             ‚îÇ 27               ‚îÇ
‚îÇ Conv       ‚îÇ 8              ‚îÇ 8                ‚îÇ
‚îÇ Gather     ‚îÇ 1              ‚îÇ 1                ‚îÇ
‚îÇ LSTM       ‚îÇ 2              ‚îÇ 2                ‚îÇ
‚îÇ MatMul     ‚îÇ 1              ‚îÇ 1                ‚îÇ
‚îÇ MaxPool    ‚îÇ 5              ‚îÇ 5                ‚îÇ
‚îÇ Relu       ‚îÇ 8              ‚îÇ 8                ‚îÇ
‚îÇ Reshape    ‚îÇ 3              ‚îÇ 3                ‚îÇ
‚îÇ Squeeze    ‚îÇ 1              ‚îÇ 1                ‚îÇ
‚îÇ Transpose  ‚îÇ 4              ‚îÇ 4                ‚îÇ
‚

TypeError: 'NoneType' object is not subscriptable

# Test Model

In [102]:
OUTPUT_TFLITE_DIR = TFLITE_DIR / f"{MODEL_NAME.lower()}_tflite_int8_{TIMESTAMP}"

In [None]:
from pathlib import Path

output_dir = Path(OUTPUT_TFLITE_DIR)
print("C√°c file/th∆∞ m·ª•c trong output:")
for item in output_dir.iterdir():
    print(item.name)

In [None]:
import tensorflow as tf
from pathlib import Path


print(f"ƒêang ki·ªÉm tra th∆∞ m·ª•c: {OUTPUT_TFLITE_DIR}\n")

# Ki·ªÉm tra xem th∆∞ m·ª•c c√≥ t·ªìn t·∫°i kh√¥ng
if not OUTPUT_TFLITE_DIR.exists():
    raise FileNotFoundError(f"Th∆∞ m·ª•c kh√¥ng t·ªìn t·∫°i: {OUTPUT_TFLITE_DIR}\n"
                            "H√£y ki·ªÉm tra l·∫°i t√™n th∆∞ m·ª•c ho·∫∑c ch·∫°y l·∫°i ph·∫ßn chuy·ªÉn ƒë·ªïi ONNX ‚Üí TFLite.")

# Li·ªát k√™ t·∫•t c·∫£ file .tflite trong th∆∞ m·ª•c
tflite_files = list(OUTPUT_TFLITE_DIR.glob("*.tflite"))

if not tflite_files:
    raise FileNotFoundError(f"Kh√¥ng t√¨m th·∫•y file .tflite n√†o trong {OUTPUT_TFLITE_DIR}")

# In danh s√°ch file k√®m k√≠ch th∆∞·ªõc
print("=== DANH S√ÅCH FILE TFLITE & K√çCH TH∆Ø·ªöC ===")
def format_size(size_bytes):
    for unit in ['B', 'KB', 'MB', 'GB']:
        if size_bytes < 1024:
            return f"{size_bytes:.2f} {unit}"
        size_bytes /= 1024
    return f"{size_bytes:.2f} TB"

for file_path in sorted(tflite_files):
    size = file_path.stat().st_size
    print(f"{file_path.name:<55} {format_size(size)}")

print()  # d√≤ng tr·ªëng

# Ch·ªçn file INT8 ∆∞u ti√™n (c√≥ ch·ª©a "integer", "int8", "quant")
int8_file = next(
    (f for f in tflite_files if any(keyword in f.name.lower() for keyword in ["integer", "int8", "quant"])),
    tflite_files[0]  # n·∫øu kh√¥ng c√≥ th√¨ d√πng file ƒë·∫ßu ti√™n
)

print(f"ƒêang test v·ªõi: {int8_file.name} ({format_size(int8_file.stat().st_size)})\n")

# Load v√† test model
interpreter = tf.lite.Interpreter(model_path=str(int8_file))
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

print("=== TH√îNG TIN INPUT ===")
for i, detail in enumerate(input_details):
    print(f"Input {i}:")
    print(f"  Name: {detail['name']}")
    print(f"  Shape: {detail['shape']}")
    print(f"  Dtype: {detail['dtype']}")
    print(f"  Quantization: {detail['quantization']}")

print("\n=== TH√îNG TIN OUTPUT ===")
for detail in output_details:
    print(f"Output shape: {detail['shape']}")

In [None]:
import h5py
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import librosa
from pathlib import Path

PROJECT_PATH = Path('/content/drive/MyDrive/AutomaticHeartSoundClassification-main')
DATASET_PATH = PROJECT_PATH / "data"

features_file = DATASET_PATH / "logmel_features.h5"
label_file = DATASET_PATH / "label.csv"

# ================== LOGMEL DATASET ==================
EPS = 1e-8

def standard_normal_variate(data):
    mean = np.mean(data)
    std = np.std(data) + EPS
    return (data - mean) / std

class LogMelDataset(Dataset):
    def __init__(self, features_h5_path, label_csv_path, keys=None,
                 delta=True, norm=True, duration=5, hop_length=15, training=False):
        self.features_h5 = h5py.File(features_h5_path, 'r')
        self.labels_df = pd.read_csv(label_csv_path)

        if keys is None:
            self.keys = list(self.features_h5.keys())
        else:
            self.keys = keys

        # Map filename (stem) ‚Üí label
        self.key_to_label = dict(zip(self.labels_df.iloc[:, 0].astype(str), self.labels_df.iloc[:, 1]))

        self.delta = delta
        self.norm = norm
        self.duration = duration
        self.hop_length = hop_length
        self.training = training
        self.fixed_length = int(self.duration * 1000 / self.hop_length)  # 333 frames

    def __len__(self):
        return len(self.keys)

    def __getitem__(self, idx):
        key = self.keys[idx]
        feature = self.features_h5[key][()]  # (mel_bins, time_frames)

        if self.norm:
            feature = standard_normal_variate(feature)

        if feature.ndim == 2:
            feature = feature[np.newaxis, :, :]  # (1, mel, T)

        channels, mel_bins, num_frames = feature.shape

        if self.delta:
            orig = feature[0]
            delta1 = librosa.feature.delta(orig)
            delta2 = librosa.feature.delta(delta1)
            feature = np.stack([orig, delta1, delta2], axis=0)  # (3, mel, T)
            channels = 3

        # Crop/pad to fixed 333 frames
        if num_frames >= self.fixed_length:
            start = (num_frames - self.fixed_length) // 2
            feature = feature[:, :, start:start + self.fixed_length]
        else:
            pad_width = self.fixed_length - num_frames
            feature = np.pad(feature, ((0,0),(0,0),(0,pad_width)), mode='wrap')

        # Gh√©p 3 channel ‚Üí 384 mel bins (nh∆∞ trong model VGG11)
        if channels > 1:
            feature = feature.reshape(channels * mel_bins, self.fixed_length)  # (384, 333)

        feature_tensor = torch.from_numpy(feature).float()  # (384, 333)

        label = torch.tensor(self.key_to_label.get(key, 0), dtype=torch.long)
        return feature_tensor, label

    def close(self):
        self.features_h5.close()

In [None]:
# Load to√†n b·ªô keys
with h5py.File(features_file, 'r') as hf:
    all_keys = list(hf.keys())

print(f"T·ªïng s·ªë m·∫´u: {len(all_keys)}")

# T·∫°o dataset to√†n b·ªô (d√πng ƒë·ªÉ test cu·ªëi c√πng)
full_dataset = LogMelDataset(
    features_file,
    label_file,
    keys=all_keys,
    delta=True,
    norm=True,
    duration=5,
    hop_length=15,
    training=False  # kh√¥ng random crop
)

full_loader = DataLoader(
    full_dataset,
    batch_size=32,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

print("‚úÖ ƒê√£ t·∫°o DataLoader th√†nh c√¥ng!")

In [None]:
import tensorflow as tf
import numpy as np

def test_tflite_accuracy(tflite_path, data_loader):
    # Load TFLite model
    interpreter = tf.lite.Interpreter(model_path=str(tflite_path))
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    input_shape = input_details[0]['shape']  # v√≠ d·ª• [1, 333, 384] ho·∫∑c [1, 384, 333]

    print(f"Testing model: {tflite_path.name}")
    print(f"Expected input shape: {input_shape}")

    correct = 0
    total = 0

    for features, labels in data_loader:
        batch_size = features.shape[0]

        for i in range(batch_size):
            x = features[i].numpy()  # (384, 333)

            # ƒêi·ªÅu ch·ªânh shape theo model TFLite y√™u c·∫ßu
            if input_shape[1] == 333 and input_shape[2] == 384:
                x = x.transpose(1, 0)  # (333, 384)
            # N·∫øu l√† (1, 384, 333) ‚Üí gi·ªØ nguy√™n

            x = np.expand_dims(x, axis=0).astype(np.float32)  # (1, H, W)

            interpreter.set_tensor(input_details[0]['index'], x)
            interpreter.invoke()
            output = interpreter.get_tensor(output_details[0]['index'])  # (1, 2)

            pred = np.argmax(output, axis=1)[0]
            true_label = labels[i].item()

            if pred == true_label:
                correct += 1
            total += 1

    accuracy = correct / total
    print(f"=> Accuracy tr√™n {total} m·∫´u: {accuracy*100:.2f}% ({correct}/{total})")
    print("-" * 60)
    return accuracy

In [None]:
from pathlib import Path

# T√¨m file INT8
tflite_files = list(OUTPUT_TFLITE_DIR.glob("*integer*.tflite")) + list(OUTPUT_TFLITE_DIR.glob("*quant*.tflite"))
if not tflite_files:
    raise FileNotFoundError("Kh√¥ng t√¨m th·∫•y file TFLite INT8!")

int8_tflite_path = tflite_files[0]
print(f"ƒêang test v·ªõi: {int8_tflite_path.name}")

# Ch·∫°y test
test_tflite_accuracy(int8_tflite_path, full_loader)

# ƒê√≥ng dataset
full_dataset.close()