In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import _LRScheduler
import torch.utils.data as data

import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torchvision.datasets import ImageFolder

from sklearn import decomposition
from sklearn import manifold
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
from tqdm.notebook import tqdm, trange
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.pyplot as plt

import copy
import random
import time

In [None]:
import torch
torch.autograd.set_detect_anomaly(True)

In [None]:
SEED = 23

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

# Build model

In [None]:
class Alexnet(nn.Module):
    def __init__(self):
        super(Alexnet, self).__init__()
        # Input (227*227*3)
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=96, kernel_size=(11,11), stride=4, padding=0)
        self.conv2 = nn.Conv2d(in_channels=96, out_channels=256, kernel_size=(5,5), padding=2)
        self.conv3 = nn.Conv2d(in_channels= 256, out_channels=384, kernel_size=(3,3), padding=1)
        self.conv4 = nn.Conv2d(in_channels= 384, out_channels=384, kernel_size=(3,3), padding=1)
        self.conv5 = nn.Conv2d(in_channels= 384, out_channels=256, kernel_size=(3,3), padding=1)
        self.pool = nn.MaxPool2d(kernel_size=(3,3), stride=2)
        self.relu = nn.ReLU()
        self.linear1 = nn.Linear(in_features=9216,out_features=4096)
        self.linear2 = nn.Linear(in_features=4096,out_features=1000)
        self.linear3 = nn.Linear(in_features=1000,out_features=2)
        self.dropout = nn.Dropout(p=0.5,inplace=True)
    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        x = self.relu(self.conv3(x))
        x = self.relu(self.conv4(x))
        x = self.relu(self.conv5(x))
        x = self.pool(x)
        x = x.reshape(x.shape[0],-1)
        x = self.relu(self.linear1(x))
        x = self.relu(self.linear2(x))
        x = self.linear3(x)
        return x

In [None]:
model = Alexnet()

# Data Processing

In [None]:
pretrained_size = (227,227)

train_trans = transforms.Compose([
    transforms.Resize(pretrained_size),             # Thay đổi kích thước ảnh
    transforms.RandomRotation(5),                   # Xoay ngẫu nhiên ảnh
    transforms.RandomHorizontalFlip(0.5),           # Lật ngang ngẫu nhiên
    transforms.RandomCrop(pretrained_size, padding=10),  # Cắt ngẫu nhiên sau khi padding
    transforms.ToTensor(),                          # Chuyển đổi thành tensor
])


test_trans = transforms.Compose([
    transforms.Resize(pretrained_size),
    transforms.ToTensor()])

In [None]:
train_folder_direc = "E:/Data/Dog_cat/training_set"
test_folder_direc = "E:/Data/Dog_cat/test_set"

# Tải các tập dữ liệu với ImageFolder
train_data = datasets.ImageFolder(root=train_folder_direc, transform=train_trans,)
test_data = datasets.ImageFolder(root=test_folder_direc, transform=test_trans)

In [None]:
VALID_RATIO = 0.9

n_train_examples = int(len(train_data) * VALID_RATIO)
n_valid_examples = len(train_data) - n_train_examples

train_data, valid_data = data.random_split(train_data,
                                           [n_train_examples, n_valid_examples])

In [None]:
valid_data = copy.deepcopy(valid_data)
valid_data.dataset.transform = test_trans

In [None]:
print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')

In [None]:
def normalize_image(image):
    image_min = image.min()
    image_max = image.max()
    image.clamp_(min=image_min, max=image_max)
    image.add_(-image_min).div_(image_max - image_min + 1e-5)
    return image


def plot_images(images, labels, classes, normalize=True):

    n_images = len(images)

    rows = int(np.sqrt(n_images))
    cols = int(np.sqrt(n_images))

    fig = plt.figure(figsize=(10, 10))

    for i in range(rows*cols):

        ax = fig.add_subplot(rows, cols, i+1)

        image = images[i]

        ax.imshow(image.permute(1, 2, 0).cpu().numpy())
        ax.set_title(classes[labels[i]])
        ax.axis('off')

In [None]:
N_IMAGES = 25

images, labels = zip(*[(image, label) for image, label in
                       [train_data[i] for i in range(N_IMAGES)]])

classes = test_data.classes

plot_images(images, labels, classes)

In [None]:
BATCH_SIZE = 64

train_iterator = data.DataLoader(train_data,
                                 shuffle=True,
                                 batch_size=BATCH_SIZE)

valid_iterator = data.DataLoader(valid_data,
                                 batch_size=BATCH_SIZE)

test_iterator = data.DataLoader(test_data,
                                batch_size=BATCH_SIZE)

# Training model

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

criterion = nn.CrossEntropyLoss()

model = model.to(device)
criterion = criterion.to(device)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

In [None]:
FOUND_LR = 5e-4

params = [
          {'params': model.parameters(), 'lr': FOUND_LR },
         ]

optimizer = optim.Adam(params, lr=FOUND_LR)

In [None]:
def calculate_accuracy(y_pred, y):
    top_pred = y_pred.argmax(1, keepdim=True)
    correct = top_pred.eq(y.view_as(top_pred)).sum()
    acc = correct.float() / y.shape[0]
    return acc

In [None]:
def train(model, iterator, optimizer, criterion, device):

    # Khỏi tạo loss và accuracy
    epoch_loss = 0
    epoch_acc = 0

    # Đặt model ở chế độ huấn luyện
    model.train()

    for(x,y) in tqdm(iterator, desc="Training", leave=True):

        x = x.to(device)
        y = y.to(device)

        # Đặt lại gradient bằng 0
        optimizer.zero_grad()

        y_pred = model(x)

        loss = criterion(y_pred, y)

        acc = calculate_accuracy(y_pred, y)

        # Truyền ngược để tính gradient
        loss.backward()

        # Cập nhật tham số mô hình
        optimizer.step()

        # Cộng dồn loss và acc
        epoch_loss += loss.item()
        epoch_acc += acc.item()

    # Tính toán loss và acc trung bình
    avg_loss = epoch_loss/len(iterator)
    avg_acc = epoch_acc/len(iterator)

    return avg_loss, avg_acc

In [None]:
def evaluate(model, iterator, criterion, device):

    epoch_loss = 0
    epoch_acc = 0

    model.eval()

    with torch.no_grad():

        for (x, y) in tqdm(iterator, desc="Evaluating", leave=False):

            x = x.to(device)
            y = y.to(device)

            y_pred = model(x)

            loss = criterion(y_pred, y)

            acc = calculate_accuracy(y_pred, y)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    avg_loss = epoch_loss / len(iterator)
    avg_acc = epoch_acc / len(iterator)

    return avg_loss, avg_acc


In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
EPOCHS = 10

best_valid_loss = float('inf')

for epoch in trange(EPOCHS, desc="Epochs"):

    start_time = time.monotonic()

    train_loss, train_acc = train(model, train_iterator, optimizer, criterion, device)
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion, device)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'dog_cat_model_alexnet.pt')

    end_time = time.monotonic()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

In [None]:
model.load_state_dict(torch.load('dog_cat_model_alexnet.pt'))

In [None]:
test_loss, test_acc = evaluate(model, test_iterator, criterion, device)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

# Examining the Model

In [None]:
def get_predictions(model, iterator):

    model.eval()

    images = []
    labels = []
    probs = []

    with torch.no_grad():

        for (x, y) in tqdm(iterator):

            x = x.to(device)

            y_pred = model(x)

            y_prob = F.softmax(y_pred, dim=-1)

            images.append(x.cpu())
            labels.append(y.cpu())
            probs.append(y_prob.cpu())

    images = torch.cat(images, dim=0)
    labels = torch.cat(labels, dim=0)
    probs = torch.cat(probs, dim=0)

    return images, labels, probs

In [None]:
images, labels, probs = get_predictions(model, test_iterator)

In [None]:
pred_labels = torch.argmax(probs, 1)

In [None]:
def plot_confusion_matrix(labels, pred_labels, classes):

    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(1, 1, 1)
    cm = confusion_matrix(labels, pred_labels)
    cm = ConfusionMatrixDisplay(cm, display_labels=classes)
    cm.plot(values_format='d', cmap='Blues', ax=ax)
    plt.xticks(rotation=20)

plot_confusion_matrix(labels, pred_labels, classes)

In [None]:
corrects = torch.eq(labels, pred_labels)

In [None]:
incorrect_examples = []

for image, label, prob, correct in zip(images, labels, probs, corrects):
    if not correct:
        incorrect_examples.append((image, label, prob))

incorrect_examples.sort(reverse = True,
                        key=lambda x: torch.max(x[2], dim=0).values)

In [None]:
def plot_most_incorrect(incorrect, classes, n_images, normalize=True):

    rows = int(np.sqrt(n_images))
    cols = int(np.sqrt(n_images))

    fig = plt.figure(figsize=(25, 20))

    for i in range(rows*cols):

        ax = fig.add_subplot(rows, cols, i+1)

        image, true_label, probs = incorrect[i]
        image = image.permute(1, 2, 0)
        true_prob = probs[true_label]
        incorrect_prob, incorrect_label = torch.max(probs, dim=0)
        true_class = classes[true_label]
        incorrect_class = classes[incorrect_label]

        if normalize:
            image = normalize_image(image)

        ax.imshow(image.cpu().numpy())
        ax.set_title(f'true label: {true_class} ({true_prob:.3f})\n'
                     f'pred label: {incorrect_class} ({incorrect_prob:.3f})')
        ax.axis('off')

    fig.subplots_adjust(hspace=0.4)

In [None]:
N_IMAGES = 25

plot_most_incorrect(incorrect_examples, classes, N_IMAGES)

In [None]:
def get_representations(model, iterator):

    model.eval()

    outputs = []
    labels = []

    with torch.no_grad():

        for (x, y) in tqdm(iterator):

            x = x.to(device)

            y_pred = model(x)

            outputs.append(y_pred.cpu())
            labels.append(y)

    outputs = torch.cat(outputs, dim=0)
    labels = torch.cat(labels, dim=0)

    return outputs, labels

In [None]:
outputs, labels = get_representations(model, train_iterator)

In [None]:
def plot_representations(data, labels, classes, n_images=None):

    if n_images is not None:
        data = data[:n_images]
        labels = labels[:n_images]

    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(111)
    scatter = ax.scatter(data[:, 0], data[:, 1], c=labels, cmap='tab10')
    handles, labels = scatter.legend_elements()
    ax.legend(handles=handles, labels=classes)

In [None]:
def get_tsne(data, n_components=2, n_images=None):

    if n_images is not None:
        data = data[:n_images]

    tsne = manifold.TSNE(n_components=n_components, random_state=0)
    tsne_data = tsne.fit_transform(data)
    return tsne_data

In [None]:
N_IMAGES = 5_000

output_tsne_data = get_tsne(outputs, n_images=N_IMAGES)
plot_representations(output_tsne_data, labels, classes, n_images=N_IMAGES)

In [None]:
import torchvision.models as models

alexnet = models.alexnet(num_classes=2)

In [None]:
print(alexnet)

In [None]:
FOUND_LR = 5e-4

params = [
          {'params': alexnet.parameters(), 'lr': FOUND_LR },
         ]

optimizer = optim.Adam(params, lr=FOUND_LR)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

criterion = nn.CrossEntropyLoss()

alexnet = alexnet.to(device)
criterion = criterion.to(device)
EPOCHS = 10

best_valid_loss = float('inf')

for epoch in trange(EPOCHS, desc="Epochs"):

    start_time = time.monotonic()

    train_loss, train_acc = train(alexnet, train_iterator, optimizer, criterion, device)
    valid_loss, valid_acc = evaluate(alexnet, valid_iterator, criterion, device)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(alexnet.state_dict(), 'dog_cat_model_alexnet_1.pt')

    end_time = time.monotonic()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

In [None]:
alexnet.load_state_dict(torch.load('dog_cat_model_alexnet_1.pt'))

In [None]:
test_loss, test_acc = evaluate(alexnet, test_iterator, criterion, device)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')