In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score
from tqdm import tqdm

In [2]:
class VinaFoodDataLoader:
    def __init__(self, batch_size=64, num_workers=2, data_dir='/kaggle/input/vinafood21/VinaFood21', input_image_size=224):
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.data_dir = data_dir
        self.input_image_size = input_image_size

        self.transform = transforms.Compose([
            # 1. Resize image into 224 x 224
            transforms.Resize((input_image_size, input_image_size)),

            # 2. Convert into PyTorch Tensor
            transforms.ToTensor(),

            # 3. Normalize
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def get_train_loader(self):
        train_path = os.path.join(self.data_dir, 'train')
        train_dataset = datasets.ImageFolder(root=train_path, transform=self.transform)
        train_loader = DataLoader(dataset=train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=self.num_workers)
        print(f"Loaded {len(train_dataset)} training samples from {train_path}. Found {len(train_dataset.classes)} classes")
        self.num_classes = len(train_dataset.classes)
        return train_loader

        # Mỗi thư mục con tương ứng với 1 nhãn (class) -> gán label cho từng ảnh dựa theo thứ tự thư mục con được sắp xếp alphabetically

    def get_test_loader(self):
        test_path = os.path.join(self.data_dir, 'test')
        test_dataset = datasets.ImageFolder(root=test_path, transform=self.transform)
        test_loader = DataLoader(dataset=test_dataset, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False)
        print(f"Loaded {len(test_dataset)} test samples from {test_path}")
        return test_loader


In [3]:
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):

        """
        Args:
            in_channels: Number of input channels
            out_channels: Number of output channels
            stride: Stride for the first convolutional layer
            downsample(nn.Module, optional): Shortcut connection if dimensions need to change
        """
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        # padding = 1 to maintain the same size of input x

        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # Shortcut connection
        self.downsample = downsample

        # if downsample == None -> create if input != output

        if self.downsample is None and (stride != 1 or in_channels != out_channels):
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward (self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        if self.downsample is not None:
            identity = self.downsample(x)
        out += identity
        out = self.relu(out)
        return out



In [4]:
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64 # Output of conv1 (input of conv2_x)

        # Initialize
        # Input: (N, 3, 224, 224)
        # Output: (N, 64, 112, 112)
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=7, stride=2, padding=3)
        # Padding = 3 được suy ra từ công thức tính output size
        self.bn1 = nn.BatchNorm2d(64)

        # 2. Max Pooling 1
        # Input: (N, 64, 112, 112)
        # Output: (N, 64, 56, 56)
        self.maxpool1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=1, ceil_mode=True)

        # 3. Residual Block
        # 3.1 Conv2_x: Output: (N, 64, 56, 56)
        self.layer1 = self._make_layer(block, 64, layers[0], stride=1)
        # Max Pooling (Inter-stage 1)
        # Input: (N, 64, 56, 56) -> Output: (N, 64, 28, 28)
        self.maxpool2 = nn.MaxPool2d(kernel_size=3, stride=2, padding=0)

        # 3.2 Conv3_x: Output(N, 128, 28, 28)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=1)
        # Max Pooling (Inter-stage 2)
        # Input: (N, 128, 28, 28) -> Output: (N, 128, 14, 14)
        self.maxpool3 = nn.MaxPool2d(kernel_size=3, stride=2, padding=0)


        # 3.3 Conv4_x: Output(N, 256, 14, 14)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=1)
        # Max Pooling (Inter-stage 3)
        # Input: (N, 256, 14, 14) -> Output: (N, 256, 7, 7)
        self.maxpool4 = nn.MaxPool2d(kernel_size=3, stride=2, padding=0)


         # 3.4 Conv5_x: Output(N, 512, 7, 7)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=1)

        # 4. Average Pooling
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))

        # 5. FC
        self.fc = nn.Linear(512, num_classes)

        # Initialize parameters for Cons/ BN layer
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        """
        Helper function to create a sequential layer  (stage - ex: conv2_x, conv3_x, ...) of residual blocks
            - Each stage include many residual similar blocks (ex: 2 blocks in ResNet-18)
        """
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        layers = []
        # Block đầu tiên trong stage có thể cần projection shortcut nếu số kênh thay đổi
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels # Cập nhật in_channels cho các block tiếp theo

        # Các block còn lại trong stage sẽ có identity shortcut (stride=1, in_channels == out_channels)
        for _ in range(1, num_blocks):
            layers.append(block(self.in_channels, out_channels, stride=1))

        return nn.Sequential(*layers)


    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.maxpool1(x)

        x = self.layer1(x) # Conv2_x
        x = self.maxpool2(x)

        x = self.layer2(x) # Conv3_x
        x = self.maxpool3(x)

        x = self.layer3(x) # Conv4_x
        x = self.maxpool4(x)

        x = self.layer4(x) # Conv5_x

        x = self.avgpool(x)
        x = torch.flatten(x, 1)

        logits = self.fc(x)
        probabilities = F.softmax(logits, dim=1)
        return logits, probabilities



In [5]:
def resnet18(num_classes=21):
        # layer = [num_blocks_in_conv2_x, num_blocks_in_conv3_x, num_blocks_in_conv4_x, num_blocks_in_conv5_x]
        return ResNet(BasicBlock, [2, 2, 2, 2], num_classes=21)

# Vì ResNet có nhiều version nên thay vì gọi class ResNet trên thì sẽ xây dụng một class cho nhiều version ResNet -> xây dụng một hàm để gọi riêng từng version 

In [6]:
def evaluate_model(model, data_loader, device):
    model.eval()
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for data, target in tqdm(data_loader, desc="Evaluating"):
            data, target = data.to(device), target.to(device)
            _, probabilities = model(data)
            predictions = probabilities.argmax(dim=1)
            all_preds.extend(predictions.cpu().numpy())
            all_targets.extend(target.cpu().numpy())

    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)
    print("Overall evaluation metrics")
    overall_accuracy = accuracy_score(all_targets, all_preds)
    print(f"Accuracy: {overall_accuracy:.4f}")

    overall_recall = recall_score(all_targets, all_preds, average='macro', zero_division=0)
    print(f"Recall: {overall_recall:.4f}")

    overall_precision = precision_score(all_targets, all_preds, average='macro', zero_division=0)
    print(f"Precision: {overall_accuracy:.4f}")

    overall_f1 = f1_score(all_targets, all_preds, average='macro', zero_division=0)
    print(f"F1: {overall_accuracy:.4f}")

    num_classes = len(np.unique(all_targets))
    per_class_results = {}

    # print(f"Per-class evaluation metrics")
    # for i in range (num_classes):
    #     class_target = (all_targets == i).astype(int)
    #     class_pred = (all_preds == i).astype(int)

    #     accuracy = accuracy_score(class_target, class_pred)
    #     precision = precision_score(class_target, class_pred, zero_division=0)
    #     recall = recall_score(class_target, class_pred, zero_division=0)
    #     f1 = f1_score(class_target, class_pred, zero_division=0)

    #     per_class_results[i] = {
    #         'accuracy': accuracy,
    #         'precision': precision,
    #         'recall': recall,
    #         'f1': f1
    #     }
    #     print(f"Class {i}: \n Accuracy: {accuracy:.4f} \n Recall: {recall:.4f} \n Precision: {precision:.4f} \n F1: {f1:.4f}")
    return {
        'overall': {
            'accuracy': overall_accuracy,
            'precision': overall_precision,
            'recall': overall_recall,
            'f1': overall_f1
        }
        # 'per_class': per_class_results
    }


In [7]:
class Trainer:
    def __init__(self, model, train_loader, test_loader, device, learning_rate=0.01, epochs=5, save_dir='/kaggle/working/checkpoints/'):
        self.model = model
        self.train_loader = train_loader
        self.test_loader = test_loader
        self.device = device
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.save_dir = save_dir

        self.best_accuracy = 0.0

        self.criterion = nn.CrossEntropyLoss()

        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        os.makedirs(self.save_dir, exist_ok=True)

    def train_epoch(self, epoch):
        self.model.train()
        total_loss = 0
        for batch_idx, (data, target) in enumerate(tqdm(self.train_loader, desc=f'Training {epoch}/{self.epochs}')):
            data, target = data.to(self.device), target.to(self.device)

            self.optimizer.zero_grad()
            _,logits = self.model(data)
            loss = self.criterion(logits, target)
            loss.backward()
            self.optimizer.step()

            total_loss += loss

        avg_loss = total_loss/len(self.train_loader)
        print(f"Epoch {epoch} Training loss: {avg_loss:.4f}")
        return avg_loss

    def train(self):
        print(f"Full training on {self.device} for {self.epochs} with {self.learning_rate}")
        for epoch in range(1, self.epochs + 1):
            self.train_epoch(epoch)

            metrics = evaluate_model(self.model, self.test_loader, self.device)
            current_accuracy = metrics['overall']['accuracy']
            if current_accuracy > self.best_accuracy:
                self.best_accuracy = current_accuracy
                model_path = os.path.join(self.save_dir, 'best_model_assigment_03.pt')
                torch.save(self.model.state_dict(), model_path)
                print(f"Save new model version with accurcay = {self.best_accuracy:.4f}")
        print("Training_finished")
        print(f"Best model version is saved at {model_path} with accuracy = {self.best_accuracy}")

In [8]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device}")

print("Load data")
data_loader = VinaFoodDataLoader()
train_loader = data_loader.get_train_loader()
test_loader = data_loader.get_test_loader()

num_classes = data_loader.num_classes
print(f"Number of classes: {num_classes}")


Using cuda
Load data
Loaded 10044 training samples from /kaggle/input/vinafood21/VinaFood21/train. Found 21 classes
Loaded 6682 test samples from /kaggle/input/vinafood21/VinaFood21/test
Number of classes: 21


In [11]:
print("Build model")
model = resnet18(21).to(device)
print(model)

Build model
ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (maxpool1): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=True)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, k

In [12]:
print("Start training")
trainer = Trainer(
    model=model,
    train_loader=train_loader,
    test_loader=test_loader,
    device=device,
    learning_rate=0.01,
    epochs=3,
    save_dir= '/kaggle/working/checkpoints/'
)
trainer.train()

print("Evaluate with best model")
best_model_path = '/kaggle/working/checkpoints/best_model_assignment_02.pt'
final_model = resnet18(num_clases=num_classes).to(device)
final_model.load_state_dict(torch.load(best_model_path, map_location=device))
final_metrics = evaluate_model(
    model=final_model,
    data_loader=test_loader,
    device=device
)
print("Final overall metrics")
for k, v in final_model['overall'].items():
    print(f"{k}: {v}")

# print("Final per-class metrics")
# for label, metrics in final_metrics['per_class'].items():
#     print (f"Class {label}: \n Acc: {metrics['accuracy']:.4f} \n Precision: {metrics['precision']:.4f} \n Recall: {metrics['recall']:.4f} \n F1: {metrics['f1']:.4f}")


Start training
Full training on cuda for 3 with 0.01


Training 1/3: 100%|██████████| 157/157 [01:39<00:00,  1.59it/s]


Epoch 1 Training loss: 3.0336


Evaluating: 100%|██████████| 105/105 [01:07<00:00,  1.55it/s]


Overall evaluation metrics
Accuracy: 0.0792
Recall: 0.0473
Precision: 0.0792
F1: 0.0792
Save new model version with accurcay = 0.0792


Training 2/3: 100%|██████████| 157/157 [01:21<00:00,  1.92it/s]


Epoch 2 Training loss: 3.0356


Evaluating: 100%|██████████| 105/105 [00:57<00:00,  1.81it/s]


Overall evaluation metrics
Accuracy: 0.0908
Recall: 0.0476
Precision: 0.0908
F1: 0.0908
Save new model version with accurcay = 0.0908


Training 3/3: 100%|██████████| 157/157 [01:27<00:00,  1.80it/s]


Epoch 3 Training loss: 3.0326


Evaluating: 100%|██████████| 105/105 [00:59<00:00,  1.77it/s]

Overall evaluation metrics
Accuracy: 0.0908
Recall: 0.0476
Precision: 0.0908
F1: 0.0908
Training_finished
Best model version is saved at /kaggle/working/checkpoints/best_model_assigment_03.pt with accuracy = 0.09084106554923675
Evaluate with best model





NameError: name 'GoogleNet' is not defined