In [1]:
## Connect to gg driver
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%cd /content/drive/MyDrive/AI Tutor/CV/Topic2: Object Classification/day04/Tranfer_Learning
%ls

/content/drive/.shortcut-targets-by-id/1g_hBCGxmI5lTFXyvD-igJcroxCjDzObt/AI Tutor/CV/Topic2: Object Classification/day04/Tranfer_Learning
Main.ipynb


In [3]:
PATH_DATA = '/content/drive/MyDrive/AI Tutor/CV/Topic2: Object Classification/day04/dataset'

## Install some package

In [4]:
!pip install efficientnet_pytorch
!pip install torchsummary

Collecting efficientnet_pytorch
  Downloading efficientnet_pytorch-0.7.1.tar.gz (21 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: efficientnet_pytorch
  Building wheel for efficientnet_pytorch (setup.py) ... [?25l[?25hdone
  Created wheel for efficientnet_pytorch: filename=efficientnet_pytorch-0.7.1-py3-none-any.whl size=16428 sha256=5554f2508ed686d8bf8b24c5812988adf284e9fb59f04ff403259975531e95f4
  Stored in directory: /root/.cache/pip/wheels/03/3f/e9/911b1bc46869644912bda90a56bcf7b960f20b5187feea3baf
Successfully built efficientnet_pytorch
Installing collected packages: efficientnet_pytorch
Successfully installed efficientnet_pytorch-0.7.1


In [5]:
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
from torchvision import models
from efficientnet_pytorch import EfficientNet
import torch.nn as nn
from sklearn.metrics import confusion_matrix, f1_score
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm import tqdm
import torch.optim as optim


## Function Dataloader

In [6]:


class DataPreparation:
    def __init__(self, cfg):
        self.train_dir = cfg.train_dir
        self.test_dir = cfg.test_dir
        self.batch_size = cfg.batch_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.num_classes = None
        self.classes_name = None
        self.classes2idx = None
        self.TRAINLOADER = None
        self.TESTLOADER = None

        self.prepare_transforms()
        self.load_data()
        self.create_data_loaders()

    def prepare_transforms(self):
        train_transforms = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

        test_transforms = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

        self.train_transforms = train_transforms
        self.test_transforms = test_transforms

    def load_data(self):
        train_data = torchvision.datasets.ImageFolder(root=self.train_dir, transform=self.train_transforms)
        test_data = torchvision.datasets.ImageFolder(root=self.test_dir, transform=self.test_transforms)

        self.num_classes = len(train_data.classes)
        self.classes_name = train_data.classes
        self.classes2idx = train_data.class_to_idx

        print(f"Số lượng lớp: {self.num_classes}")
        print(f"Tên lớp: {self.classes_name}")
        print(f"Ánh xạ từ tên lớp sang chỉ số: {self.classes2idx}")
        print("Number of train: ", len(train_data))
        print("Number of test: ", len(test_data))

        self.train_data = train_data
        self.test_data = test_data

    def create_data_loaders(self):
        self.trainloader = DataLoader(self.train_data, batch_size=self.batch_size, shuffle=True)
        self.testloader = DataLoader(self.test_data, batch_size=self.batch_size, shuffle=False)

## Function Model

In [7]:

def resnet18_frozen(num_classes):
    # Tải pre-trained ResNet-18
    model = models.resnet18(pretrained=True)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    for name, param in model.named_parameters():
        if 'layer4' in name or 'fc' in name:
            param.requires_grad = True
        else:
            param.requires_grad = False

    return model

def mobilenetv3_frozen(num_classes):
    # Tải pre-trained MobileNetV3
    model = models.mobilenet_v3_small(pretrained=True)
    model.classifier[3] = nn.Linear(model.classifier[3].in_features, num_classes)
    for name, param in model.named_parameters():
        if 'classifier' in name:
            param.requires_grad = True
        else:
            param.requires_grad = False

    return model



## Evaluation function and visualize

In [8]:
def evaluate_model(model, test_dataloader, device, class_names):
    # Set the model in evaluation mode
    model.eval()

    all_predictions = []
    all_true_labels = []

    with torch.no_grad():
        for data in tqdm(test_dataloader, desc='eval'):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            all_predictions.extend(predicted.cpu().tolist())
            all_true_labels.extend(labels.cpu().tolist())

    # Compute confusion matrix
    cm = confusion_matrix(all_true_labels, all_predictions)

    # Plot confusion matrix using Seaborn
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted labels')
    plt.ylabel('True labels')
    plt.title('Confusion Matrix')
    plt.show()
    return all_predictions, all_true_labels

def visualize(trainer):
    test_F1 = trainer.test_F1
    train_F1 = trainer.train_F1
    loss_test = trainer.loss_test
    loss_train = trainer.loss_train

    # Plotting the metrics
    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.plot(loss_train, label='Train Loss')
    plt.plot(loss_test, label='Test Loss')
    plt.title('Loss over epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_F1, label='Train F1 Score')
    plt.plot(test_F1, label='Test F1 Score')
    plt.title('F1 Score over epochs')
    plt.xlabel('Epochs')
    plt.ylabel('F1 Score')
    plt.legend()

    plt.tight_layout()
    plt.show()

## Function training


In [9]:

class Trainer:
    def __init__(self, model, train_loader, test_loader, cfg):
        self.model = model
        self.train_loader = train_loader
        self.test_loader = test_loader
        self.patience = cfg.patience
        self.max_epochs = cfg.max_epochs
        self.device = cfg.device


        self.model_best_f1 = None
        self.train_F1 = []
        self.test_F1 = []
        self.loss_train = []
        self.loss_test = []

    def train_one_epoch(self, optimizer, scheduler, criterion):
        self.model.train()
        total_loss = 0.0
        y_true = []
        y_pred = []

        for batch_idx, (data, target) in enumerate(self.train_loader):
            data, target = data.to(self.device), target.to(self.device)
            optimizer.zero_grad()
            output = self.model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

            _, predicted = torch.max(output, 1)
            y_true.extend(target.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())
        if scheduler is not None:
            scheduler.step()
        f1 = f1_score(y_true, y_pred, average='micro')
        self.train_F1.append(f1)
        self.loss_train.append(total_loss / len(self.train_loader))

    def test_one_epoch(self, criterion):
        self.model.eval()
        total_loss = 0.0
        y_true = []
        y_pred = []

        with torch.no_grad():
            for data, target in self.test_loader:
                data, target = data.to(self.device), target.to(self.device)
                output = self.model(data)
                loss = criterion(output, target)
                total_loss += loss.item()

                _, predicted = torch.max(output, 1)
                y_true.extend(target.cpu().numpy())
                y_pred.extend(predicted.cpu().numpy())

        f1 = f1_score(y_true, y_pred, average='micro')
        self.test_F1.append(f1)
        self.loss_test.append(total_loss / len(self.test_loader))

    def train(self, optimizer, scheduler, criterion):
        self.model.to(self.device)
        early_stopping_counter = 0
        best_f1 = -float('inf')

        for epoch in tqdm(range(self.max_epochs), desc='epochs'):
            self.train_one_epoch(optimizer, scheduler, criterion)
            self.test_one_epoch(criterion)

            print(f"Epoch {epoch+1}/{self.max_epochs} - Train Loss: {self.loss_train[-1]:.4f} - Test Loss: {self.loss_test[-1]:.4f} - Train F1: {self.train_F1[-1]:.4f} - Test F1: {self.test_F1[-1]:.4f}")

            if self.test_F1[-1] > best_f1:
                best_f1 = self.test_F1[-1]
                early_stopping_counter = 0
                self.model_best_f1 = self.model
            else:
                early_stopping_counter += 1

            if early_stopping_counter >= self.patience:
                print("Early stopping triggered.")
                break

        print("Training finished.")


## Tuning model

In [10]:
import os

In [11]:
class Config:
    def __init__(self):
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'  # Thiết bị sử dụng (cuda hoặc cpu)
        self.train_dir = os.path.join(PATH_DATA, 'train')  # Đường dẫn đến dữ liệu huấn luyện
        self.test_dir = os.path.join(PATH_DATA, 'test')
        self.num_classes = None
        self.class_names = None
        self.batch_size = 64
        self.max_epochs = 10
        self.patience = 3
        self.lr = 0.001
        self.beta1 = 0.95
        self.beta2 = 0.993
        self.model_name = 'mobilenet'

In [12]:
# Loss
criterion = nn.CrossEntropyLoss()
# Config
cfg = Config()
data_preparation = DataPreparation(
    cfg = cfg
    )

cfg.device

Số lượng lớp: 6
Tên lớp: ['buildings', 'forest', 'glacier', 'mountain', 'sea', 'street']
Ánh xạ từ tên lớp sang chỉ số: {'buildings': 0, 'forest': 1, 'glacier': 2, 'mountain': 3, 'sea': 4, 'street': 5}
Number of train:  14034
Number of test:  3000


'cuda'

In [13]:
# Create dataloader
data_preparation.create_data_loaders()
trainloader = data_preparation.trainloader
testloader = data_preparation.testloader
cfg.num_classes = data_preparation.num_classes
cfg.class_names = data_preparation.classes_name
print("Number iteration: ", len(trainloader)/cfg.batch_size)

Number iteration:  3.4375


In [14]:
# Create Model
if cfg.model_name == 'resnet':
    model = resnet18_frozen(cfg.num_classes)
elif cfg.model_name == 'mobilenet':
    model = mobilenetv3_frozen(cfg.num_classes)


Downloading: "https://download.pytorch.org/models/mobilenet_v3_small-047dcff4.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v3_small-047dcff4.pth
100%|██████████| 9.83M/9.83M [00:00<00:00, 83.9MB/s]


In [15]:
# Create Trainer
trainer = Trainer(
    model = model,
    train_loader=trainloader,
    test_loader=testloader,
    cfg=cfg
)

In [16]:
# Optimizer
optimizer = optim.Adam(model.parameters(), lr=cfg.lr, betas=(cfg.beta1, cfg.beta2))


In [17]:
# Training
trainer.train(
    optimizer=optimizer,
    scheduler=None,
    criterion=criterion
)

epochs:  10%|█         | 1/10 [1:59:39<17:56:58, 7179.82s/it]

Epoch 1/10 - Train Loss: 0.3975 - Test Loss: 0.2937 - Train F1: 0.8545 - Test F1: 0.8913


epochs:  20%|██        | 2/10 [2:00:52<6:39:52, 2999.10s/it] 

Epoch 2/10 - Train Loss: 0.2784 - Test Loss: 0.2894 - Train F1: 0.8989 - Test F1: 0.8910


epochs:  30%|███       | 3/10 [2:02:05<3:14:01, 1663.05s/it]

Epoch 3/10 - Train Loss: 0.2438 - Test Loss: 0.3162 - Train F1: 0.9087 - Test F1: 0.8887


epochs:  30%|███       | 3/10 [2:03:18<4:47:42, 2466.07s/it]

Epoch 4/10 - Train Loss: 0.2109 - Test Loss: 0.2951 - Train F1: 0.9222 - Test F1: 0.8913
Early stopping triggered.
Training finished.





In [None]:
best_model = trainer.model_best_f1
all_predictions, all_true_labels = evaluate_model(
    model=best_model,
    test_dataloader = testloader,
    device = cfg.device,
    class_names = data_preparation.classes_name
)
f1 = f1_score(all_true_labels, all_predictions, average='weighted')
print(f1)

eval:  21%|██▏       | 10/47 [00:02<00:09,  3.75it/s]

## How to check models

In [None]:
# def mobilenetv3_frozen(num_classes):
#     # Tải pre-trained MobileNetV3
#     model = models.mobilenet_v3_large(pretrained=True)
#     model.classifier[3] = nn.Linear(model.classifier[3].in_features, num_classes)
#     for name, param in model.named_parameters():
#         if 'features.15' in name or 'classifier' in name or 'features.16' in name:
#             param.requires_grad = True
#         else:
#             param.requires_grad = False

#     return model

In [None]:
# model = EfficientNet.from_pretrained('efficientnet-b3')
# model._fc = nn.Linear(model._fc.in_features, num_classes)
# for name, param in model.named_parameters():
#     if '_blocks.25' in name or '_fc' in name:
#         param.requires_grad = True
#     else:
#         param.requires_grad = False
# return model

In [None]:
model  =  models.mobilenet_v3_small(pretrained=True)
for name, param in model.named_parameters():
    print(name, param.requires_grad)

In [None]:
model.classifier[3].in_features

In [None]:
from torchsummary import summary
summary(model.cuda(), (3, 224, 224))