In [2]:
# # Evaluation function with detailed metrics
# import torch
# import torch.nn as nn
# import torch.nn.functional as F
# import numpy as np
# from sklearn.metrics import precision_recall_fscore_support, confusion_matrix
# import logging


# class LabelSmoothingLoss(nn.Module):
#     def __init__(self, classes=46, smoothing=0.05):
#         super(LabelSmoothingLoss, self).__init__()
#         self.confidence = 1.0 - smoothing
#         self.smoothing = smoothing
#         self.cls = classes

#     def forward(self, x, target):
#         logprobs = F.log_softmax(x, dim=-1)
#         nll_loss = -logprobs.gather(dim=-1, index=target.unsqueeze(1))
#         nll_loss = nll_loss.squeeze(1)
#         smooth_loss = -logprobs.mean(dim=-1)
#         loss = self.confidence * nll_loss + self.smoothing * smooth_loss
#         return loss.mean()
    
# criterion = nn.CrossEntropyLoss(
#     label_smoothing=config.get('label_smoothing', 0.05)
# )

# def evaluate_model(model, test_loader, device, criterion, num_classes=46):
#     model.eval()
#     test_loss = 0.0
#     correct = 0
#     total = 0
#     all_preds = []
#     all_labels = []
#     with torch.no_grad():
#         for images, labels in test_loader:
#             images = images.to(device)
#             labels = labels.to(device)
#             outputs = model(images)
#             loss = criterion(outputs, labels)
#             test_loss += loss.item() * images.size(0)
#             _, predicted = outputs.max(1)
#             total += labels.size(0)
#             correct += predicted.eq(labels).sum().item()
#             all_preds.extend(predicted.cpu().numpy())
#             all_labels.extend(labels.cpu().numpy())
#     test_loss /= len(test_loader.dataset)
#     test_acc = correct / total

#     precision, recall, f1, _ = precision_recall_fscore_support(
#         all_labels, all_preds, average=None, labels=range(num_classes), zero_division=0
#     )
#     conf_matrix = confusion_matrix(all_labels, all_preds, labels=range(num_classes))

#     logger.info(f"Test Loss: {test_loss:.4f} - Test Acc: {test_acc:.4f}")
#     return test_loss, test_acc, precision, recall, f1, conf_matrix






# ---------------- Imports ----------------
import torch
import torch.nn as nn
from torchvision import transforms
from PIL import Image
import time
import os


# ---------------- Model Definition ----------------
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, dropout_rate=0.05):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.dropout = nn.Dropout(dropout_rate)
        self.shortcut = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 1, stride, bias=False),
            nn.BatchNorm2d(out_channels),
        ) if stride != 1 or in_channels != out_channels else nn.Identity()

    def forward(self, x):
        identity = self.shortcut(x)
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.dropout(out)
        out = self.bn2(self.conv2(out))
        out = self.dropout(out)
        out += identity
        return self.relu(out)

class HFCLayer(nn.Module):
    def __init__(self, num_classes, D_b):
        super().__init__()
        self.V = nn.Parameter(torch.randn(num_classes, D_b))
        self.bn = nn.BatchNorm1d(num_classes * D_b)

    def forward(self, x):
        U_b = x.sum(dim=1)
        T_b = U_b.unsqueeze(1) * self.V.unsqueeze(0)
        batch_size = T_b.size(0)
        T_b_flat = T_b.view(batch_size, -1)
        T_b_bn = self.bn(T_b_flat)
        T_b_bn = T_b_bn.view(batch_size, self.V.size(0), -1)
        return torch.relu(T_b_bn).sum(dim=2)

class MergingLayer(nn.Module):
    def __init__(self, num_branches=3):
        super().__init__()
        self.w = nn.Parameter(torch.ones(num_branches) / num_branches)

    def forward(self, inputs):
        weights = torch.softmax(self.w, dim=0)
        return sum(w * logit for w, logit in zip(weights, inputs))

class BMCNNBase(nn.Module):
    def __init__(self, dropout_rate=0.05):
        super().__init__()
        self.conv_block1 = nn.Sequential(
            ResidualBlock(1, 128, 1, dropout_rate),
            ResidualBlock(128, 128, 1, dropout_rate),
            ResidualBlock(128, 128, 1, dropout_rate),
        )
        self.pool1 = nn.MaxPool2d(2, 2)
        self.conv_block2 = nn.Sequential(
            ResidualBlock(128, 256, 1, dropout_rate),
            ResidualBlock(256, 256, 1, dropout_rate),
            ResidualBlock(256, 256, 1, dropout_rate),
        )
        self.pool2 = nn.MaxPool2d(2, 2)
        self.conv_block3 = nn.Sequential(
            ResidualBlock(256, 512, 1, dropout_rate),
            ResidualBlock(512, 512, 1, dropout_rate),
            ResidualBlock(512, 512, 1, dropout_rate),
        )

    def forward(self, x):
        x1 = self.conv_block1(x)
        x2 = self.conv_block2(self.pool1(x1))
        x3 = self.conv_block3(self.pool2(x2))
        return x1, x2, x3

class EnhancedBMCNNwHFCs(BMCNNBase):
    def __init__(self, num_classes=46, dropout_rate=0.05):
        super().__init__(dropout_rate)
        self.hfc1 = HFCLayer(num_classes, 32*32)
        self.hfc2 = HFCLayer(num_classes, 16*16)
        self.hfc3 = HFCLayer(num_classes, 8*8)
        self.merging = MergingLayer(3)

    def forward(self, x):
        x1, x2, x3 = super().forward(x)
        logit1 = self.hfc1(x1.view(x1.size(0), x1.size(1), -1))
        logit2 = self.hfc2(x2.view(x2.size(0), x2.size(1), -1))
        logit3 = self.hfc3(x3.view(x3.size(0), x3.size(1), -1))
        return self.merging((logit1, logit2, logit3))



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix
from torch.utils.data import DataLoader
import logging
from torchvision import datasets, transforms

# Setup logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Custom transform for data augmentation
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=0.05):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean

test_dir = "Test"

train_transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

train_ds = datasets.ImageFolder(test_dir, transform=train_transform)
BATCH_SIZE = 32

test_loader_ = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
# 1. Setup Criterion with Label Smoothing
# Using the built-in implementation available in torch.nn
def get_criterion(config):
    smoothing = config.get('label_smoothing', 0.05)
    return nn.CrossEntropyLoss(label_smoothing=smoothing)

# 2. Evaluation Function (With Labels)
def evaluate_model(model, test_loader, device, criterion, num_classes=46):
    model.eval()
    test_loss = 0.0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            test_loss += loss.item() * images.size(0)
            
            # Get class indices
            _, predicted = torch.max(outputs, 1)
            
            all_preds.append(predicted.cpu())
            all_labels.append(labels.cpu())

    # Concatenate all results efficiently
    all_preds = torch.cat(all_preds).numpy()
    all_labels = torch.cat(all_labels).numpy()
    
    # Calculate metrics
    test_loss /= len(test_loader.dataset)
    test_acc = (all_preds == all_labels).mean()

    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average=None, labels=range(num_classes), zero_division=0
    )
    conf_matrix = confusion_matrix(all_labels, all_preds, labels=range(num_classes))

    logger.info(f"Test Loss: {test_loss:.4f} - Test Acc: {test_acc:.4f}")
    return test_loss, test_acc, precision, recall, f1, conf_matrix

# 3. Pure Inference Function (Without Labels / For Production)
def run_inference(model, data_loader, device):
    """
    Use this for deployment where you only have images.
    """
    model.eval()
    predictions = []
    probabilities = []

    with torch.no_grad():
        for images in data_loader:
            # Handle cases where data_loader might return (img, label) 
            if isinstance(images, (list, tuple)):
                images = images[0]
                
            images = images.to(device)
            logits = model(images)
            
            # Apply softmax to get calibrated probabilities
            probs = F.softmax(logits, dim=1)
            _, predicted = torch.max(logits, 1)
            
            predictions.append(predicted.cpu())
            probabilities.append(probs.cpu())

    return torch.cat(predictions).numpy(), torch.cat(probabilities).numpy()

# Example Usage Implementation
if __name__ == "__main__":
    # Mock Config
    config = {'label_smoothing': 0.05}
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Initialize loss
    criterion = get_criterion(config)
    MODEL_PATH = "models/best_model.pth"
    device = ("cuda" if torch.cuda.is_available() else "cpu")
    Basnet = EnhancedBMCNNwHFCs(num_classes=46, dropout_rate=0.0).to(device)
    checkpoint = torch.load(MODEL_PATH, map_location=device)
    Basnet.load_state_dict(checkpoint["model_state_dict"])
    Basnet.eval()
    print("Model loaded successfully on device:", device)
    model = Basnet.to(device)
    test_loader = test_loader_
    
    # To evaluate accuracy and F1:
    results = evaluate_model(model, test_loader, device, criterion)
    
    # To just get predictions:
    preds, probs = run_inference(model, test_loader, device)

#print final loss and accuracy
    print(f"Final Test Loss: {results[0]:.4f}, Final Test Accuracy: {results[1]:.4f}")


Model loaded successfully on device: cpu


INFO:__main__:Test Loss: 5.9583 - Test Acc: 0.0212
