# Cấu hình để chạy trên Colab kết nối với Drive

Cấu trúc thư mục Drive: 

    drive/MyDrive/AppliedML/data
        /images # chứa ảnh để huấn luyện
        /test_images # chứa ảnh để test

In [1]:
import torch
torch.cuda.is_available()

False

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import os
os.chdir("./drive/MyDrive/AppliedML")

# Chuẩn bị dữ liệu để huấn luyện

In [4]:
import os
from PIL import Image
import matplotlib.pyplot as plt

import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms


In [5]:
data_path = './data/images'
len(os.listdir(data_path))
# size of image: 64x64

13165

In [6]:
# Label 0: smile, 1: not smile
files = sorted(os.listdir(data_path))
labels = np.array([0 if file[:8] == 'positive' else 1 for file in files])
len(labels), np.bincount(labels)

(13165, array([3690, 9475]))

In [7]:
# Chia data thành train/validation với tỷ lệ kiểm thử val = 0.2
from sklearn.model_selection import train_test_split

train_X, val_X, train_Y, val_Y = train_test_split(files, labels, test_size=0.2, stratify=labels)

n_labels = np.bincount(train_Y)
class_weight = n_labels.max() / n_labels

In [None]:
# Do dữ liệu mất cân bằng, 
# class_weight được thêm vào hàm loss để tăng tỉ trọng của lớp bị lệch lên
class_weight

array([2.56840322, 1.        ])

In [8]:
from torch.utils.data import Dataset, DataLoader

data_path = './data/test_images'
class SmileDataset(Dataset):
    def __init__(self, image_name, labels, transform, augment = None):
        super(SmileDataset, self).__init__()
        self.image_name = image_name
        self.labels = labels
        self.transform = transform
        self.augment = augment
        self.cache = {} # dữ liệu tương đối nhỏ có thể cache lại để load nhanh hơn
    def __len__(self):
        return len(self.image_name)
    def __getitem__(self, index):
        label = self.labels[index]
        image = self.cache.get(index, None)
        if image == None:
            image = Image.open(os.path.join(data_path, self.image_name[index])).convert("L") #range [0, 1]
            image = self.transform(image)
            self.cache[index] = image
        
        return (self.augment(image), label) if self.augment != None else (image, label)
    
transform = transforms.ToTensor()
# sử dụng các biến đổi để làm giàu dữ liệu
augment = transforms.RandomApply(transforms = [transforms.GaussianBlur(3),
                                         transforms.RandomPerspective(),
                                         transforms.RandomRotation(degrees=(0, 45)),
                                         transforms.RandomAutocontrast(),
                                         transforms.RandomHorizontalFlip(),
                                         transforms.RandomVerticalFlip()])
train_dataset = SmileDataset(train_X, train_Y, transform, augment)
val_dataset = SmileDataset(val_X, val_Y, transform)

# Tạo mô hình để thử nghiệm

- Bao gồm các thử nghiệm về : Googlenet, Resnet(18, 50, 101)

In [9]:
from torchvision import models

class Model(nn.Module):
    def __init__(self, name):
        super(Model, self).__init__()
        self.net = self.__create_model(name)
    def __create_model(self, name):
        if name == 'lenet':
            model = models.googlenet(pretrained = True)
        elif name == 'resnet18':
            model = models.resnet18(pretrained = True)
        elif name == 'resnet50':
            model = models.resnet50(pretrained = True)
        else:
            return None
        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, 2) #2 class
        return model
    def forward(self, X):
        # Input ảnh xám 1 chiều, trong khi input của các mạng là ảnh 3 chiều
        # repeat lại cái giá trị của ảnh xám thành ảnh 3 chiều
        padding_X = torch.repeat_interleave(X, 3, dim = 1)
        return self.net(padding_X)

In [12]:
# Cấu hình các tham số học
n_epoch = 15
lr = 1e-3
batch_size = 32
global_device = 'cuda' if torch.cuda.is_available() else 'cpu'
global_device

'cuda'

In [2]:
from torch import optim

train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
val_loader = DataLoader(val_dataset, batch_size = batch_size)

lenet = Model('lenet')
lenet.to(device = global_device)
optimizer = torch.optim.Adam(params = lenet.parameters(), lr = lr)

# Huấn luyện mô hình

In [16]:
from sklearn.metrics import f1_score, accuracy_score

def evaluate(logits, label):
    y_pred = np.argmax(logits, axis = 1)
    acc = accuracy_score(label, y_pred)
    f1 = f1_score(label, y_pred)
    return acc, f1

def validate(model, criterion, val_loader):
    model.eval()
    with torch.no_grad():
        losses = []
        accuracies = []
        f1_scores = []
        for X, Y in val_loader:
            X = X.to(device = global_device)
            Y = Y.to(device = global_device)
            out = model(X)
            loss = criterion(out, Y).item() if criterion != None else -1
            
            losses.append(loss)
            logits = torch.softmax(out, dim = 1)
            acc, f1 = evaluate(logits.detach().cpu().numpy(), Y.detach().cpu().numpy())
            accuracies.append(acc)
            f1_scores.append(f1)

        return np.mean(losses), np.mean(accuracies), np.mean(f1_scores)

In [1]:
import time
from tqdm.auto import tqdm
total_train_loss = []
total_val_loss = []
total_acc = []
total_f1 = []
checkpoint_path = './lenet.pt'
best = 0.0

if os.path.exists(checkpoint_path):
    print("Load checkpoint")
    checkpoint = torch.load(checkpoint_path, map_location=global_device)
    lenet.load_state_dict(checkpoint['model'])
    optimizer.load_state_dict(checkpoint['optimizer'])
    best = checkpoint['best']

criterion = nn.CrossEntropyLoss(weight = torch.tensor(class_weight, dtype = torch.float, device = global_device))
for epoch in range(n_epoch):
    lenet.train()
    start = time.time()
    print(f'Epoch {epoch}')
    losses = []
    for images, labels in tqdm(train_loader):
        images = images.to(device = global_device)
        labels = labels.to(device = global_device)
        
        optimizer.zero_grad()
        out = lenet(images)
        loss = criterion(out, labels) / 3 # 3 channel giống nhau
        loss.backward()
        optimizer.step()
        losses.append(loss.item())

    total_train_loss.append(np.mean(losses))
    val_loss, val_acc, f1 = validate(lenet, criterion, val_loader)
    total_val_loss.append(val_loss)
    total_acc.append(val_acc)
    total_f1.append(f1)
    print(f'Train loss: {total_train_loss[-1]}')
    print(f'Validation: Accuracy {val_acc}, loss {val_loss}, f1 score {f1}, in {time.time() - start}s')
    if f1 >= best:
        best = f1
        torch.save({
            'model': lenet.state_dict(),
            'optimizer': optimizer.state_dict(),
            'best': best
        }, checkpoint_path)
    # scheduler.step()


# Test mô hình

In [39]:
def load_model(name, device, checkpointPath):
    model = Model(name)
    model.to(device = device)
    checkpoint = torch.load(checkpointPath, map_location = device)
    model.load_state_dict(checkpoint['model'])
    model.eval()
    return model

In [2]:
from tqdm.auto import tqdm
smile_correct = 0
not_smile_correct = 0
total_smile_pred = 0
total_not_smile_pred = 0
for X, Y in tqdm(test_loader):
    X = X.to(device = 'cuda')
    out = model(X)
    logits = torch.softmax(out, dim = 1) 
    y_pred = torch.argmax(logits.cpu(), dim = 1)
    
    smile_correct += sum(y_pred[y_pred == Y] == 0)
    total_smile_pred += sum(y_pred == 0)

    not_smile_correct += sum(y_pred[y_pred == Y] == 1)
    total_not_smile_pred += sum(y_pred == 1)

smile_correct, total_smile_pred, not_smile_correct, total_not_smile_pred

(tensor(677), tensor(763), tensor(1809), tensor(1870)
