In [1]:
!pip install torchinfo

Collecting torchinfo
  Downloading torchinfo-1.8.0-py3-none-any.whl.metadata (21 kB)
Downloading torchinfo-1.8.0-py3-none-any.whl (23 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.8.0


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
import numpy as np
from sklearn.metrics import precision_score, recall_score, f1_score
from torch.quantization import QuantStub, DeQuantStub, prepare_qat, convert

In [None]:
# -----------------------------
# 1. Define the FOMO-Based Model
# -----------------------------
class FOMONet(nn.Module):
    def __init__(self, num_classes=2):
        super(FOMONet, self).__init__()
        self.quant = QuantStub()
        self.dequant = DeQuantStub()

        self.features = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),

            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d((1, 1))
        )

        self.classifier = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.quant(x)
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        x = self.dequant(x)
        return x

In [None]:
# -----------------------------
# 2. Prepare Dataset
# -----------------------------
class PneumoniaMNIST(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img, label = self.images[idx], self.labels[idx]
        if self.transform:
            img = self.transform(img)
        label = torch.tensor(label, dtype=torch.long).squeeze()  # Ensure labels are 1D tensors
        return img, label

# Load PneumoniaMNIST dataset
npz_file = np.load('PneumoniaMNIST.npz', allow_pickle=False)
train_images, train_labels = npz_file['train_images'], npz_file['train_labels']
val_images, val_labels = npz_file['val_images'], npz_file['val_labels']
test_images, test_labels = npz_file['test_images'], npz_file['test_labels']

# Normalize and transform the dataset
transform_train = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

transform_eval = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = PneumoniaMNIST(train_images, train_labels, transform=transform_train)
val_dataset = PneumoniaMNIST(val_images, val_labels, transform=transform_eval)
test_dataset = PneumoniaMNIST(test_images, test_labels, transform=transform_eval)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)


In [None]:
# -----------------------------
# 3. Initialize Model and Prepare for QAT
# -----------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = FOMONet(num_classes=2).to(device)

# Print the number of parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

total_params = count_parameters(model)
print(f"Total trainable parameters: {total_params}")

# Prepare for QAT
model.qconfig = torch.quantization.get_default_qat_qconfig('fbgemm')
prepare_qat(model, inplace=True)

In [None]:
# -----------------------------
# 4. Define Training and Evaluation Functions
# -----------------------------
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=15):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        running_corrects = 0
        total = 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)
            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels)
            total += labels.size(0)

        epoch_loss = running_loss / total
        epoch_acc = running_corrects.double() / total
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.2%}')

        # Validation loop
        model.eval()
        val_loss = 0.0
        val_corrects = 0
        val_total = 0

        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * images.size(0)
                _, preds = torch.max(outputs, 1)
                val_corrects += torch.sum(preds == labels)
                val_total += labels.size(0)

        val_epoch_loss = val_loss / val_total
        val_epoch_acc = val_corrects.double() / val_total
        print(f'Epoch {epoch+1}/{num_epochs}, Val Loss: {val_epoch_loss:.4f}, Val Acc: {val_epoch_acc:.2%}')


In [None]:
# -----------------------------
# 5. Train and Quantize the Model
# -----------------------------
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=15)

# Convert to quantized model
model.eval()
quantized_model = convert(model, inplace=True)

In [None]:
# -----------------------------
# 6. Evaluate the Quantized Model
# -----------------------------
def evaluate_model(model, test_loader):
    model.eval()
    test_loss = 0.0
    test_corrects = 0
    test_total = 0
    all_preds = []
    all_labels = []
    criterion = nn.CrossEntropyLoss()

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to("cpu"), labels.to("cpu")
            outputs = model(images)
            loss = criterion(outputs, labels)
            test_loss += loss.item() * images.size(0)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.tolist())
            all_labels.extend(labels.tolist())
            test_corrects += torch.sum(preds == labels)
            test_total += labels.size(0)

    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')
    test_epoch_loss = test_loss / test_total
    test_epoch_acc = test_corrects.double() / test_total
    print(f"Test Loss: {test_epoch_loss:.4f}, Accuracy: {test_epoch_acc:.2%}")
    print(f"Precision: {precision:.2%}, Recall: {recall:.2%}, F1 Score: {f1:.2%}")

# Evaluation
evaluate_model(quantized_model, test_loader)

In [4]:
# quantized model to TorchScript
scripted_model = torch.jit.script(quantized_model)

# the scripted model
torchscript_path = "fomo_quantized_model.pt"
scripted_model.save(torchscript_path)
print(f"TorchScript model saved to {torchscript_path}")


TorchScript model saved to fomo_quantized_model.pt


In [5]:
!pip install onnx

Collecting onnx
  Downloading onnx-1.17.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (16 kB)
Downloading onnx-1.17.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.0/16.0 MB[0m [31m66.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: onnx
Successfully installed onnx-1.17.0


In [6]:
# the original non-quantized model
non_quantized_model = FOMONet(num_classes=2)
non_quantized_model.eval()

# Dummy input for ONNX export
dummy_input = torch.randn(1, 1, 64, 64)

# Export the model to ONNX
onnx_path = "fomo_model_non_quantized.onnx"
torch.onnx.export(
    non_quantized_model,
    dummy_input,
    onnx_path,
    export_params=True,
    opset_version=13,
    do_constant_folding=True,
    input_names=["input"],
    output_names=["output"],
    dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}}
)
print(f"ONNX model saved to {onnx_path}")


ONNX model saved to fomo_model_non_quantized.onnx


In [7]:
!pip install onnxruntime

Collecting onnxruntime
  Downloading onnxruntime-1.20.1-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.5 kB)
Collecting coloredlogs (from onnxruntime)
  Downloading coloredlogs-15.0.1-py2.py3-none-any.whl.metadata (12 kB)
Collecting humanfriendly>=9.1 (from coloredlogs->onnxruntime)
  Downloading humanfriendly-10.0-py2.py3-none-any.whl.metadata (9.2 kB)
Downloading onnxruntime-1.20.1-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (13.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.3/13.3 MB[0m [31m40.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading coloredlogs-15.0.1-py2.py3-none-any.whl (46 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.0/46.0 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading humanfriendly-10.0-py2.py3-none-any.whl (86 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.8/86.8 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected pack

In [12]:
import onnx
import onnxruntime as ort

# Load the ONNX model
onnx_model = onnx.load(onnx_path)
onnx.checker.check_model(onnx_model)

# Test ONNX runtime
ort_session = ort.InferenceSession(onnx_path)
example_input = np.random.rand(1, 1, 64, 64).astype(np.float32)
ort_inputs = {"input": example_input}
ort_outs = ort_session.run(None, ort_inputs)
print(f"Inference output from ONNX model: {ort_outs}")

#  logits to probabilities
logits = torch.tensor(ort_outs[0])
probabilities = F.softmax(logits, dim=1)  # softmax
print(f"Probabilities: {probabilities}")

# Verify
predicted_class = probabilities.argmax(dim=1).item()
print(f"Predicted Class: {predicted_class}")



Inference output from ONNX model: [array([[ 0.01150238, -0.03886358]], dtype=float32)]
Probabilities: tensor([[0.5126, 0.4874]])
Predicted Class: 0


In [9]:
import os

# Path to your ONNX model file
onnx_model_path = "fomo_model_non_quantized.onnx"

# Get the file size
onnx_file_size = os.path.getsize(onnx_model_path) / (1024 * 1024)  # Convert to MB
print(f"ONNX Model Size: {onnx_file_size:.2f} MB")

# BLE 33 Memory Limit
ble_33_flash_limit = 1

if onnx_file_size <= ble_33_flash_limit:
    print("The model fits within the BLE 33 memory limit!")
else:
    print("The model exceeds the BLE 33 memory limit. Consider further optimization.")


ONNX Model Size: 0.37 MB
The model fits within the BLE 33 memory limit!


In [14]:
import os
import numpy as np
from PIL import Image

# Load the .npz file
npz_file = np.load('PneumoniaMNIST.npz', allow_pickle=False)
train_images, train_labels = npz_file['train_images'], npz_file['train_labels']
val_images, val_labels = npz_file['val_images'], npz_file['val_labels']
test_images, test_labels = npz_file['test_images'], npz_file['test_labels']

# Create directories for images
output_dir = "EdgeImpulseDataset"
splits = ['train', 'val', 'test']
classes = ['normal', 'pneumonia']  # Adjust based on your dataset

for split in splits:
    for class_name in classes:
        os.makedirs(os.path.join(output_dir, split, class_name), exist_ok=True)

# Function to save images
def save_images(images, labels, split):
    for i, (img, label) in enumerate(zip(images, labels)):
        label = int(label)  # Ensure label is an integer
        class_name = classes[label]
        img = Image.fromarray((img * 255).astype(np.uint8))  # Scale to 0-255 if needed
        img = img.convert('L')  # Convert to grayscale if necessary
        filename = os.path.join(output_dir, split, class_name, f"{split}_{i}.jpg")
        img.save(filename)

#  train, val, and test images
save_images(train_images, train_labels, 'train')
save_images(val_images, val_labels, 'val')
save_images(test_images, test_labels, 'test')

print(f"Dataset saved to {output_dir}.")


  label = int(label)  # Ensure label is an integer


Dataset saved to EdgeImpulseDataset.


In [16]:
!zip -r EdgeImpulseDataset.zip EdgeImpulseDataset


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  adding: EdgeImpulseDataset/val/pneumonia/val_267.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_494.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_59.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_107.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_375.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_401.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_300.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_368.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_113.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_379.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_273.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_0.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_56.jpg (stored 0%)
  adding: EdgeImpulseDataset/val/pneumonia/val_6.jpg (stored 0%)
  a