<a href="https://colab.research.google.com/github/Rig12332113/ML/blob/master/hw2/ml_hw2_sample_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## ML HW2 sample code

若有問題可直接寄信至助教信箱，環境部分推薦使用前述所提到之環境，若使用自己的電腦執行，請注意環境的相容性


#### Import packages

In [None]:
import os
import random
import glob
import csv
import torch
import torch.nn as nn
import numpy as np
import pandas as pd

from torch.optim import Adam
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image

In [None]:
!gdown 1drrS7gnyzUJPPiQcDWcHdIXqzjy2n3yZ
!unzip '2023HW2.zip'

Downloading...
From: https://drive.google.com/uc?id=1drrS7gnyzUJPPiQcDWcHdIXqzjy2n3yZ
To: /content/HW2.zip
100% 41.9M/41.9M [00:00<00:00, 181MB/s]
unzip:  cannot find or open 2023HW2.zip, 2023HW2.zip.zip or 2023HW2.zip.ZIP.


#### Set arguments and random seed

In [None]:
TRA_PATH = 'data/train/'
TST_PATH = 'data/test/'
LABEL_PATH = 'data/train.csv'
DEVICE_ID = 0
SEED = 1428
NUM_ECPOCH = 300

torch.cuda.set_device(DEVICE_ID)
use_gpu = torch.cuda.is_available()
device = torch.device("cuda" if use_gpu else "cpu")

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
random.seed(SEED)
np.random.seed(SEED)

#### Process data

In [None]:
def load_train_data(img_path, label_path, valid_ratio=0.12):
    train_label = pd.read_csv(label_path)['label'].values.tolist()
    train_image = [f'{img_path}/{i+7000}.jpg' for i in range(len(train_label)-1)]

    train_data = list(zip(train_image, train_label))
    random.shuffle(train_data)

    split_len = int(len(train_data) * valid_ratio)
    train_set = train_data[split_len:]
    valid_set = train_data[:split_len]

    return train_set, valid_set

def load_test_data(img_path):
    test_set = [f'{img_path}/{i}.jpg' for i in range(0,6999)]
    return test_set

def compute_statistics(dataset):
    data = []
    for (img_path, label) in dataset:
        data.append(np.array(Image.open(img_path)))
    data = np.array(data)
    return data.mean(), data.std()

In [None]:
train_set, valid_set = load_train_data(TRA_PATH, LABEL_PATH)
test_set = load_test_data(TST_PATH)
transform = None # do augmentation there

FileNotFoundError: ignored

#### Customize dataset

In [None]:
class FaceExpressionDataset(Dataset):
    def __init__(self, data, augment=None):
        self.data = data
        self.augment = augment

    def __len__(self):
        return len(self.data)

    def normalize(self, data):
        # TODO: do normalization there
        pass

    def read_img(self, idx):
        img = Image.open(self.data[idx][0])
        if not self.augment is None:
            img = self.augment(img)
        img = torch.from_numpy(np.array(img)).float()
        img = img.unsqueeze(0).float()
        # img = self.normalize(img)
        return img

    def __getitem__(self, idx):
        img = self.read_img(idx)
        label = self.data[idx][1]
        return img, label

class TestingDataset(Dataset):
    def __init__(self, data, augment=None):
        self.data = data
        self.augment = augment

    def __len__(self):
        return len(self.data)

    def normalize(self, data):
        # TODO: do normalization there
        pass

    def read_img(self, idx):
        img = Image.open(self.data[idx])
        if not self.augment is None:
            img = self.augment(img)
        img = torch.from_numpy(np.array(img)).float()
        img = img.unsqueeze(0).float()
        # img = self.normalize(img)
        return img, self.data[idx].split('/')[-1][:-4]

    def __getitem__(self, idx):
        img, name = self.read_img(idx)

        return img, name

In [None]:
train_dataset = FaceExpressionDataset(train_set, transform)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)

valid_dataset = FaceExpressionDataset(valid_set)
valid_loader = DataLoader(valid_dataset, batch_size=128, shuffle=False)

test_dataset = TestingDataset(test_set)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

#### Define module class

In [None]:
class FaceExpressionNet(nn.Module):
    def __init__(self):
        super(FaceExpressionNet, self).__init__()
        # TODO
        self.conv = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64, eps=1e-05, affine=True),
            nn.LeakyReLU(negative_slope=0.05),
            nn.MaxPool2d((2, 2)),
        )
        self.fc = nn.Sequential(
            nn.Linear(64 * 32 * 32, 7),
        )

    def forward(self, x):
        #image size (64,64)
        x = self.conv(x) #(32,32)
        x = x.flatten(start_dim=1)
        x = self.fc(x)
        return x

#### Define training and testing process

In [None]:
def train(train_loader, model, loss_fn, use_gpu=True):
    model.train()
    train_loss = []
    train_acc = []
    for (img, label) in train_loader:
        if use_gpu:
            img = img.to(device)
            label = label.to(device)
        optimizer.zero_grad()
        output = model(img)
        loss = loss_fn(output, label)
        loss.backward()
        optimizer.step()
        with torch.no_grad():
            predict = torch.argmax(output, dim=-1)
            acc = np.mean((label == predict).cpu().numpy())
            train_acc.append(acc)
            train_loss.append(loss.item())
    print("Epoch: {}, train Loss: {:.4f}, train Acc: {:.4f}".format(epoch + 1, np.mean(train_loss), np.mean(train_acc)))

def valid(valid_loader, model, loss_fn, use_gpu=True):
    model.eval()
    with torch.no_grad():
        valid_loss = []
        valid_acc = []
        for idx, (img, label) in enumerate(valid_loader):
            if use_gpu:
                img = img.to(device)
                label = label.to(device)
            output = model(img)
            loss = loss_fn(output, label)
            predict = torch.argmax(output, dim=-1)
            acc = (label == predict).cpu().tolist()
            valid_loss.append(loss.item())
            valid_acc += acc

        valid_acc = np.mean(valid_acc)
        valid_loss = np.mean(valid_loss)
        print("Epoch: {}, valid Loss: {:.4f}, valid Acc: {:.4f}".format(epoch + 1, valid_loss, valid_acc))
    return valid_acc

def save_checkpoint(valid_acc, acc_record, epoch, prefix='model'):
    # you can define the condition to save model :)
    if valid_acc >= np.mean(acc_record[-5:]):
        checkpoint_path = f'{prefix}.pth'
        torch.save(model.state_dict(), checkpoint_path)
        print('model saved to %s' % checkpoint_path)

def better(acc_record):
    if max(acc_record) == acc_record[-1]: return True
    return False

In [None]:
if __name__ == '__main__':
    model = FaceExpressionNet()
    if use_gpu:
        model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    loss_fn = nn.CrossEntropyLoss()

    acc_record = []

    for epoch in range(NUM_ECPOCH):
        train(train_loader, model, loss_fn, use_gpu)
        valid_acc = valid(valid_loader, model, loss_fn, use_gpu=True)
        acc_record.append(valid_acc)


        if better(acc_record):
            save_checkpoint(valid_acc, acc_record, epoch, prefix='model')

        print('########################################################')

In [None]:
def test(test_loader, model, file_name='predict.csv'):
    with torch.no_grad():
        predict_result = []
        predict_name = []
        for img, name in test_loader:
            if use_gpu:
                img = img.to(device)
            output = model(img)
            predict = torch.argmax(output, dim=-1).tolist()
            predict_result += predict
            predict_name += name

    with open(file_name, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['id', 'label'])
        for id, r in zip(predict_name, predict_result):
            writer.writerow([id, r])

In [None]:
del model
model = FaceExpressionNet()
model.load_state_dict(torch.load('model.pth'))
model = model.cuda()
test(test_loader, model)