In [1]:
from torch import nn, optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
from tqdm import tqdm
import torch
import time
import matplotlib.pyplot as plt

In [2]:
BATCH_SIZE = 32
EPOCH = 8
LR = 0.0001
new_model_train = True
device = "cuda" if torch.cuda.is_available() else "cpu"
criterion = nn.CrossEntropyLoss()
model_type = "PreTrained_ResNet50_SavemodelTest"
dataset = "dataset"
save_model_path = f"result/{model_type}{dataset}.pt"
save_history_path = f"result/{model_type}history{dataset}.pt"

In [3]:
device

'cpu'

In [4]:
train_dir = 'dataset/train'
valid_dir = 'dataset/valid'
test_dir = 'dataset/test'

train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

valid_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

train_DS = datasets.ImageFolder(train_dir, transform=train_transform)
valid_DS = datasets.ImageFolder(valid_dir, transform=valid_transform)
test_DS = datasets.ImageFolder(test_dir, transform=test_transform)

train_DL = DataLoader(train_DS, batch_size=BATCH_SIZE, shuffle=True)
valid_DL = DataLoader(valid_DS, batch_size=BATCH_SIZE, shuffle=True)
test_DL = DataLoader(test_DS, batch_size=BATCH_SIZE, shuffle=True)

In [5]:
class ResNet(nn.Module):
    def __init__(self, num_classes=6):
        super().__init__()
        self.resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        self.resnet = nn.Sequential(*list(self.resnet.children())[:-1])

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(2048, 1024),
            nn.BatchNorm1d(1024, momentum=0.05),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512, momentum=0.05),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(512, num_classes),
        )

    def forward(self, x):
        x = self.resnet(x)
        x = self.classifier(x)
        return x

In [None]:
def loss_epoch(model, DL, criterion, optimizer = None):
    N = len(DL.dataset)
    rloss = 0; rcorrect = 0
    for x_batch, y_batch in tqdm(DL, leave=False):
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        y_hat = model(x_batch)
        loss = criterion(y_hat, y_batch)
        if optimizer is not None:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        loss_b = loss.item() * x_batch.shape[0]
        rloss += loss_b
        pred = torch.argmax(y_hat, dim=1)
        corrects_b = torch.sum(pred == y_batch).item()
        rcorrect += corrects_b
    loss_e = rloss/N
    accruracy_e = rcorrect/N * 100
    return loss_e, accruracy_e, rcorrect

In [None]:
def Train(model, train_DL, val_DL, criterion, optimizer,
          EPOCH, BATCH_SIZE, save_model_path, save_history_path):
    loss_history = {"train" : [], "val":[]}
    acc_history = {"train" : [], "val":[]}
    best_loss = 9999
    for ep in range(EPOCH):
        epoch_start = time.time()
        current_lr = optimizer.param_groups[0]["lr"]
        print(f"Epoch : {ep+1}, current_LR = {current_lr}")

        model.train()
        train_loss, train_acc, _ = loss_epoch(model, train_DL, criterion, optimizer)
        loss_history["train"] += [train_loss]
        acc_history["train"] += [train_acc]

        model.eval()
        with torch.no_grad():
            val_loss, val_acc, _ = loss_epoch(model, val_DL, criterion)
            loss_history["val"] += [val_loss]
            acc_history["val"] += [val_acc]

            if val_loss < best_loss:
                best_loss = val_loss
                torch.save({"model" : model,
                            "ep" : ep,
                            "optimizer" : optimizer}, save_model_path)
        print(f"train loss: {round(train_loss,5)}, "
              f"val loss: {round(val_loss,5)} \n"
              f"train acc: {round(train_acc,1)} %, "
              f"val acc: {round(val_acc,1)} %, time: {round(time.time()-epoch_start)} s")
        print("-"*20)

    torch.save({"loss_history" : loss_history,
                "acc_history" : acc_history,
                "EPOCH": EPOCH,
                "BATCH_SIZE": BATCH_SIZE}, save_history_path)

    return loss_history

In [None]:
def Test(model, test_DL, criterion):
    model.eval()
    with torch.no_grad():
        test_loss, test_acc, rcorrect = loss_epoch(model, test_DL, criterion)
    print()
    print(f"Test loss : {round(test_loss,5)}")
    print(f"Test accuracy : {rcorrect}/{len(test_DL.dataset)} ({round(test_acc,1)} %)")
    return round(test_acc,1)

In [None]:
def Test_plot(model, test_DL):
    model.eval()
    with torch.no_grad():
        x_batch, y_batch = next(iter(test_DL))
        x_batch = x_batch.to(device)
        y_hat = model(x_batch)
        pred = y_hat.argmax(dim=1)

    x_batch = x_batch.to("cpu")

    plt.figure(figsize=(10,4))
    for idx in range(6):
        plt.subplot(2,3, idx+1, xticks=[], yticks=[])
        plt.imshow(x_batch[idx].permute(1,2,0).squeeze(), cmap="gray")
        pred_class = test_DL.dataset.classes[pred[idx]]
        true_class = test_DL.dataset.classes[y_batch[idx]]
        plt.title(f"{pred_class} ({true_class})", color = "g" if pred_class == true_class else "r")

In [None]:
if new_model_train:
    optimizer = optim.Adam(model.parameters(), lr = LR)
    loss_history = Train(model, train_DL, valid_DL, criterion, optimizer, EPOCH,
                         BATCH_SIZE, save_model_path, save_history_path)
    loss_history = list(loss_history.values())[0]
    plt.plot(range(1, EPOCH+1), loss_history)
    plt.xlabel('Epoch')
    plt.ylabel('loss')
    plt.title("Train Loss")
    plt.grid()
else:
    optimizer = optim.Adam(load_model.parameters(), lr = LR)
    loss_history = Train(load_model, train_DL, valid_DL, criterion, optimizer, EPOCH,
                        BATCH_SIZE, save_model_path, save_history_path)
    loss_history = list(loss_history.values())[0]
    plt.plot(range(1, EPOCH+1), loss_history)
    plt.xlabel('Epoch')
    plt.ylabel('loss')
    plt.title("Train Loss")
    plt.grid()

In [None]:
load_model = torch.load(save_model_path)["model"]