In [1]:
import torch
from torch.utils.data import Dataset
import os
import cv2 

def collate_fn(samples: list[dict]) -> dict:
    #images = [sample['image'].permute(1, 2, -1).unsqueeze(0) for sample in samples] 
    images = [sample['image'] for sample in samples]
    labels = [sample['label'] for sample in samples] 

    images = torch.stack(images, dim=0)
    labels = torch.tensor(labels)
    
    return {
        'image': images,
        'label': labels
    }

class VinaFood(Dataset):
    #def __init__(self, path: str):
    def __init__(self, path: str, image_size: tuple[int]):
        super().__init__()
    
        self.image_size = image_size
        self.label2idx = {}
        self.idx2label = {}
        self.data: list[dict] = self.load_data(path)
        
    def load_data(self, path):
        data = []
        label_id = 0
        print(f"Loading data from: {path}")
        for folder in os.listdir(path):
            label = folder
            if label not in self.label2idx:
                self.label2idx[label] = label_id
                label_id += 1
            folder_path = os.path.join(path, folder)
            print(f"Processing folder: {folder} (label_id: {self.label2idx[label]})")
            
            for image_file in os.listdir(folder_path):
                image_path = os.path.join(folder_path, image_file)
                image = cv2.imread(image_path)
                data.append({
                    'image': image,
                    'label': label
                })

        self.idx2label = {id: label for label, id in self.label2idx.items()}
        return data
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx: int) -> dict:
        item = self.data[idx]
        
        image = item['image']
        label = item['label']
        
        # image = cv2.resize(image, (224, 224))
        # label_id = self.label2idx[label]
        
        image = cv2.resize(image, self.image_size)
        # Convert to RGB if needed (OpenCV loads in BGR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        # Convert to tensor once
        image = torch.tensor(image, dtype=torch.float32).permute(2,0,1) / 255.0
        return {
            'image': image,
            'label': self.label2idx[label]
        }
    
    
    

In [2]:
import torch 
from torch import nn 
from torch.nn import functional as F

class ResNet18(nn.Module):
    def __init__(self, image_size, num_labels):
        super().__init__()
        self.c, self.h, self.w = image_size
        self.conv1 = nn.Conv2d(
            in_channels=self.c,
            out_channels=64,
            kernel_size=7,
            stride=2,
            padding=3
        )
        self.bn1 = nn.BatchNorm2d(64)
        self.maxpool = nn.MaxPool2d(
            kernel_size=3,
            stride=2,
            padding=1
        )
        # Define ResNet Blocks
        self.layer1 = nn.Sequential(
            ResNetBlock(64, 64, kernel_size=3, stride=1, padding=1),
            ResNetBlock(64, 64, kernel_size=3, stride=1, padding=1)
        )
        self.layer2 = nn.Sequential(
            ResNetBlock(64, 128, kernel_size=3, stride=2, padding=1, conv=True, padding_identity=0, stride_identity=2),
            ResNetBlock(128, 128, kernel_size=3, stride=1, padding=1),
            ResNetBlock(128, 128, kernel_size=3, stride=1, padding=1)
        )
        self.layer3 = nn.Sequential(
            ResNetBlock(128, 256, kernel_size=3, stride=2, padding=1, conv=True, padding_identity=0, stride_identity=2),
            ResNetBlock(256, 256, kernel_size=3, stride=1, padding=1),
        )
        self.layer4 = nn.Sequential(
            ResNetBlock(256, 512, kernel_size=3, stride=2, padding=1, conv=True, padding_identity=0, stride_identity=2),
            ResNetBlock(512, 512, kernel_size=3, stride=1, padding=1),
        ) 

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_labels)
    
    def forward(self, x):
        # x: Tensor(B, C, H, W)
        x = F.relu(self.bn1(self.conv1(x)))  # (B, 64, H/2, W/2)
        x = self.maxpool(x)                   # (B, 64, H/4, W/4)
        x = self.layer1(x)                    # (B, 64, H/4, W/4)
        x = self.layer2(x)                    # (B, 128, H/8, W/8)
        x = self.layer3(x)                    # (B, 256, H/16, W/16)
        x = self.layer4(x)                    # (B, 512, H/32, W/32)
        x = self.avgpool(x)                   # (B, 512, 1, 1)
        x = torch.flatten(x, 1)               # (B, 512)
        x = self.fc(x)                        # (B, num_labels)
        return x

class ResNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding, conv=False, padding_identity=0, stride_identity=1):
        super().__init__()
        # First convolution layer
        self.conv1 = nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding
        )
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        # Second convolution layer (stride should be 1)
        self.conv2 = nn.Conv2d(
            in_channels=out_channels,
            out_channels=out_channels,
            kernel_size=kernel_size,
            stride=1,  # Always 1 for conv2
            padding=padding
        )
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # Shortcut connection
        self.use_shortcut = conv
        self.shortcut = nn.Sequential(
            nn.Conv2d(
                in_channels=in_channels,
                out_channels=out_channels,
                kernel_size=1,
                stride=stride_identity,
                padding=padding_identity
            ),
            nn.BatchNorm2d(out_channels)
        ) if conv else None

    def forward(self, x):
        identity = x
        
        # First conv block
        out = self.conv1(x)
        out = self.bn1(out)
        out = F.relu(out)
        
        # Second conv block
        out = self.conv2(out)
        out = self.bn2(out)
        
        # Shortcut connection
        if self.use_shortcut:
            identity = self.shortcut(x)
            
        # Add shortcut
        out += identity
        out = F.relu(out)
        
        return out

In [3]:
from torch.utils.data import DataLoader
from torch import nn 
import torch
import numpy as np 
from sklearn.metrics import precision_score, recall_score, f1_score
from vinafood_dataset import VinaFood, collate_fn
from ResNet_18 import ResNet18

device = "cpu"
image_size = (224, 224)

train_dataset = VinaFood(
    path=r"D:\NguyenTienDat_23520262\Nam_3\DL\BT2\VinaFood21\train",
    image_size=image_size
)

# test_dataset = VinaFood(
#     path=r"D:\NguyenTienDat_23520262\Nam_3\DL\BT2\VinaFood21\test",
#     image_size=image_size
# )

train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=32,
    shuffle=True,
    collate_fn=collate_fn
)

# test_loader = DataLoader(
#     dataset=test_dataset,
#     batch_size=32,
#     collate_fn=collate_fn
# )
image_size = (3, ) + image_size   # (3, 224, 224)
model = ResNet18(num_labels=len(train_dataset.idx2label), image_size=image_size).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

def evaluate(model, dataloader):
    model.eval() 
    outputs = []
    trues = []
    with torch.no_grad():
        for item in dataloader:
            image = item["image"].to(device) 
            label = item["label"].to(device) 
            output = model(image)   
            predictions = torch.argmax(output, dim=-1)  

            outputs.extend(predictions.cpu().numpy())
            trues.extend(label.cpu().numpy())
    
    # Print unique values for debugging
    print(f"Unique predictions: {np.unique(outputs)}")
    print(f"Unique true labels: {np.unique(trues)}")
    
    try:
        return {
            "recall": recall_score(trues, outputs, average="macro", zero_division=0),
            "precision": precision_score(trues, outputs, average="macro", zero_division=0),
            "f1": f1_score(trues, outputs, average="macro", zero_division=0),
        }
    except Exception as e:
        print(f"Error in metrics calculation: {e}")
        return {
            "recall": 0.0,
            "precision": 0.0,
            "f1": 0.0
        }

EPOCHS = 10 
for epoch in range(EPOCHS):
    print(f"Epoch: {epoch+1}")

    losses = []
    model.train() 
    for batch_idx, item in enumerate(train_loader):
        image = item["image"].to(device)
        label = item["label"].to(device)
        
        # Print shapes and values for first batch of each epoch
        if batch_idx == 0:
            print(f"\nImage shape: {image.shape}")
            print(f"Label shape: {label.shape}")
            print(f"Label values: {label.cpu().numpy()}")
        
        # Forward pass
        output = model(image)
        
        # Print output info for first batch
        if batch_idx == 0:
            print(f"Output shape: {output.shape}")
            print(f"Output sample: \n{output[0].cpu().detach().numpy()}\n")
        
        loss = loss_fn(output, label.long())
        losses.append(loss.item())

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"Loss: {(np.array(losses).mean())}")
    metrics = evaluate(model, train_loader)
    for metric in metrics:
        print(f"{metric}: {metrics[metric]}")


Loading data from: D:\NguyenTienDat_23520262\Nam_3\DL\BT2\VinaFood21\train
Processing folder: banh-can (label_id: 0)
Processing folder: banh-hoi (label_id: 1)
Processing folder: banh-mi-chao (label_id: 2)
Processing folder: banh-tet (label_id: 3)
Processing folder: banh-trang-tron (label_id: 4)
Processing folder: banh-u (label_id: 5)
Processing folder: banh-uot (label_id: 6)
Processing folder: bap-nuong (label_id: 7)
Processing folder: bo-kho (label_id: 8)
Processing folder: bo-la-lot (label_id: 9)
Processing folder: bot-chien (label_id: 10)
Processing folder: ca-ri (label_id: 11)
Processing folder: canh-kho-qua (label_id: 12)
Processing folder: canh-khoai-mo (label_id: 13)
Processing folder: ga-nuong (label_id: 14)
Processing folder: goi-ga (label_id: 15)
Processing folder: ha-cao (label_id: 16)
Processing folder: hoanh-thanh-nuoc (label_id: 17)
Processing folder: pha-lau (label_id: 18)
Processing folder: tau-hu (label_id: 19)
Processing folder: thit-kho-trung (label_id: 20)
Epoch: 1
