In [1]:
import torch
import numpy as np
import pandas as pd
import torch.nn as nn

In [2]:
import os, wandb
os.environ["WANDB_API_KEY"] = "9fdbffa3fe214bfa40e7ffaa6487c4c80e5a5b4f"
wandb.login()

wandb: ERROR Failed to detect the name of this notebook. You can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
wandb: Currently logged in as: brian3698741 (brian3698741-national-tsing-hua-university) to https://api.wandb.ai. Use `wandb login --relogin` to force relogin


True

In [3]:
wandb.init(project="Human Face Emotions dataset", name="baseline", config={
    "epochs": 3,
    "batch_size": 32,
    "lr": 5e-2, 
    "num_workers": 0, 
})
config = wandb.config

In [4]:
from glob import glob

ROOT = '.\Datasets\Human Face Emotions'
classes = sorted(os.listdir(ROOT))

In [5]:
classes

['Angry', 'Fear', 'Happy', 'Sad', 'Suprise']

In [6]:
# 用glob抓資料夾底下所有同類型的檔案: ROOT\cls\*.png
samples = []
for cls in classes:
    for img_path in glob(os.path.join(ROOT, cls, '*')):
        samples.append((img_path, cls))

In [7]:
samples[:5], len(samples)

([('.\\Datasets\\Human Face Emotions\\Angry\\0.png', 'Angry'),
  ('.\\Datasets\\Human Face Emotions\\Angry\\1.png', 'Angry'),
  ('.\\Datasets\\Human Face Emotions\\Angry\\10.png', 'Angry'),
  ('.\\Datasets\\Human Face Emotions\\Angry\\10002.png', 'Angry'),
  ('.\\Datasets\\Human Face Emotions\\Angry\\10005903.png', 'Angry')],
 59099)

In [8]:
from sklearn.model_selection import train_test_split

# [「要放的東西」 for 「變數」 in 「可迭代物」]
labels = [s[1] for s in samples]
tr_samples, va_samples = train_test_split(
    samples, 
    test_size=0.2, 
    shuffle=True, 
    # stratify = labels: 讓training, validation set中的labels可以被等比例切分
    stratify=labels, 
    random_state=42
)

In [9]:
print(f'Training size: {len(tr_samples)}')
print(f'Validation size: {len(va_samples)}')

Training size: 47279
Validation size: 11820


In [10]:
print('In training set: ')
for cls in classes:
    print(f'    {cls}: {([s[1] for s in tr_samples].count(cls) / len(tr_samples) * 100):.0f}%')
print('\nIn validation set: ')
for cls in classes:
    print(f'    {cls}: {([s[1] for s in va_samples].count(cls) / len(va_samples) * 100):.0f}%')

In training set: 
    Angry: 17%
    Fear: 16%
    Happy: 31%
    Sad: 21%
    Suprise: 14%

In validation set: 
    Angry: 17%
    Fear: 16%
    Happy: 31%
    Sad: 21%
    Suprise: 14%


In [11]:
from PIL import Image
from torch.utils.data import Dataset, DataLoader

class ImageDataset(Dataset):
    def __init__(self, samples, transform=None, target_transform=None):
        self.samples = samples
        self.transform = transform
        self.target_transform = target_transform
        
    def __len__(self):
        return len(self.samples)
        
    def __getitem__(self, idx):
        img = Image.open(self.samples[idx][0]).convert('L')
        label = self.samples[idx][1]
        if self.transform:
            img = self.transform(img)
        if self.target_transform:
            label = self.target_transform(label)
        return img, label
  

In [12]:
from torchvision import transforms

# Image transform = Preprocess(ex: resize to 224x224) + Data augmentation + To Tensor + Normalize
tr_tf = transforms.Compose([
    transforms.Resize((48, 48)),
    transforms.ToTensor()
])
va_tf = transforms.Compose([
    transforms.Resize((48, 48)),
    transforms.ToTensor()
])

In [13]:
# 建label_dict讓string label可以map到一個idx
label_dict = {}
for idx, cls in enumerate(classes):
    label_dict[cls] = idx
def label_to_idx(label):
    return label_dict[label]

tr_ds = ImageDataset(tr_samples, transform=tr_tf, target_transform=label_to_idx)
va_ds = ImageDataset(va_samples, transform=va_tf, target_transform=label_to_idx)

X0, y0 = tr_ds[1]
print(X0.shape, y0)

torch.Size([1, 48, 48]) 2


In [14]:
NUM_WORKERS = 0
BATCH_SIZE = 32
EPOCHS = 5
LR = 5e-2
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Device: {device}')

tr_dl = DataLoader(tr_ds, batch_size=config.batch_size, shuffle=True, num_workers=config.num_workers)
va_dl = DataLoader(va_ds, batch_size=config.batch_size, shuffle=False, num_workers=config.num_workers)

Device: cuda


In [15]:
print(f'Total batches of training set: {len(tr_dl)}')
print(f'Total batches of validation set: {len(va_dl)}')

Total batches of training set: 1478
Total batches of validation set: 370


In [16]:
import torch.nn.functional as F

class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels, use_conv1x1=False, strides=1):
        super().__init__()
        self.path1 = nn.Sequential(
            # 第一個conv2d可以用strides來降低H, W(ex: s=2 H, W / = 2)
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, stride=strides, padding=1), 
            nn.BatchNorm2d(out_channels), 
            nn.ReLU(), 
            # 第二個conv2d必須保持H, W不變 所以s=1
            nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, stride=1, padding=1), 
            nn.BatchNorm2d(out_channels)
        )
        # 若有需要則在shortcut加上1x1conv來調整channel數或H, W大小(用strides調整)
        if use_conv1x1 or (in_channels != out_channels or strides != 1):
            self.path2 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=strides, padding=0)
        else:
            self.path2 = nn.Identity()
        
    def forward(self, X):
        out = self.path1(X)
        shortcut = self.path2(X)
        # 此處的relu只是一個function而非layer
        return F.relu(out + shortcut)

In [17]:
class ResNet18(nn.Module):
    def __init__(self, in_channels, out_num):
        super().__init__()
        self.stage0 = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=7, stride=2, padding=3), 
            nn.BatchNorm2d(64), 
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1), 
        )
        self.stage1 = nn.Sequential(
            ResBlock(64, 64, strides=1),
            ResBlock(64, 64, strides=1),
        )
        self.stage2 = nn.Sequential(
            ResBlock(64, 128, strides=2),
            ResBlock(128, 128, strides=1),
        )
        self.stage3 = nn.Sequential(
            ResBlock(128, 256, strides=2),
            ResBlock(256, 256, strides=1),
        )
        self.stage4 = nn.Sequential(
            ResBlock(256, 512, strides=2),
            ResBlock(512, 512, strides=1),
        )
        self.stage5 = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)), 
            nn.Flatten(), 
            nn.Linear(512, out_num)
        )
    def forward(self, X):
        X = self.stage0(X)
        X = self.stage1(X)
        X = self.stage2(X)
        X = self.stage3(X)
        X = self.stage4(X)
        X = self.stage5(X)
        return X


In [18]:
class NewResNet18(nn.Module):
    def __init__(self, in_channels, out_num):
        super().__init__()
        self.stage0 = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=3, stride=1, padding=1), 
            nn.BatchNorm2d(64), 
            nn.ReLU(), 
            # nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        )
        self.stage1 = nn.Sequential(
            ResBlock(64, 64, strides=1),
            ResBlock(64, 64, strides=1),
        )
        self.stage2 = nn.Sequential(
            ResBlock(64, 128, strides=2),
            ResBlock(128, 128, strides=1),
        )
        self.stage3 = nn.Sequential(
            ResBlock(128, 256, strides=2),
            ResBlock(256, 256, strides=1),
        )
        self.stage4 = nn.Sequential(
            ResBlock(256, 512, strides=2),
            ResBlock(512, 512, strides=1),
        )
        self.stage5 = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)), 
            nn.Flatten(), 
            nn.Linear(512, out_num)
        )
    def forward(self, X):
        X = self.stage0(X)
        X = self.stage1(X)
        X = self.stage2(X)
        X = self.stage3(X)
        X = self.stage4(X)
        X = self.stage5(X)
        return X


In [19]:
model = NewResNet18(1, 5)
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model.parameters(), lr=config.lr)

In [20]:
from tqdm import tqdm

for epoch in range(config.epochs):
    model.train()
    tr_loss = 0
    for Xb, yb in tqdm(tr_dl, desc=f"Epoch {epoch+1}/{config.epochs}"):
        Xb, yb = Xb.to(device), yb.to(device)
        preds = model(Xb)
        loss = criterion(preds, yb)
        tr_loss += loss.item()
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    tr_loss /= len(tr_dl)
    print(f"Epoch {epoch+1}: Training loss = {tr_loss:.4f}")
    
    wandb.log({"epoch": epoch, "loss": avg_loss, "train_acc": tr_acc})
    
    model.eval()
    with torch.no_grad():
        va_loss = 0
        for Xb, yb in va_dl:
            Xb, yb = Xb.to(device), yb.to(device)
            preds = model(Xb)
            loss = criterion(preds, yb)
            va_loss += loss.item()
        va_loss /= len(va_dl)
        print(f"Epoch {epoch+1}: Validation loss = {va_loss:.4f}")
        wandb.log({"val_acc": va_acc})

Epoch 1/3: 100%|███████████████████████████████████████████████████████████████████| 1478/1478 [06:34<00:00,  3.74it/s]


Epoch 1: Training loss = 1.2598


NameError: name 'avg_loss' is not defined