In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as f
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torchvision.utils import save_image
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
torch.manual_seed(1)
IMAGE_SIZE = 128

device = 'cuda' if torch.cuda.is_available() else 'cpu'
if device == 'cuda': torch.cuda.manual_seed_all(1)

print(device)

In [None]:
original_datasets = ImageFolder(
    root='../data/flower_photos',
    transform=transforms.Compose(
        [transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), transforms.ToTensor()]    # transform할 내용이 2가지 이상일 때
    )
)

original_loader = DataLoader(
    dataset=original_datasets,
    batch_size=2313,
    shuffle=True,
    num_workers=1   # 데이터 로드 시 사용할 병렬 프로세스 수
)

print(original_datasets)

In [None]:
for x, y in original_loader:
    print(x.size(), y.size())
    print(y)    # 폴더 순서대로 자동 인덱싱 해서 targets를 생성함
    break

In [None]:
imgs, labels = next(iter(original_loader))
plt.figure(figsize=(8, 10))

def plot(x, y):
    for i in range(len(x)):
        plt.subplot(5, 5, i+1)
        plt.title(y[i].item())
        plt.imshow(x[i].permute(1, 2, 0))
        plt.axis('off')
    plt.show()

plot(imgs[:25], labels[:25])

In [None]:
img_transform = transforms.Compose(
    [
        transforms.Resize((256, 256)),
        transforms.CenterCrop((IMAGE_SIZE, IMAGE_SIZE)),
        transforms.ToTensor()
    ]
)
transforms_dataset = ImageFolder(
    root='../data/flower_photos', transform=img_transform
)
transforms_loader = DataLoader(
    transforms_dataset,
    batch_size=1000,
    shuffle=True
)

In [None]:
transform_img, transform_label = next(iter(transforms_loader))
print(transform_img.shape, transform_label.shape)

In [None]:
plot(transform_img[:25], transform_label[:25])

In [None]:
x_trans = torch.cat([x, transform_img], dim=0)
y_trans = torch.cat([y, transform_label], dim=0)
print(x_trans.size(), y_trans.size())

In [None]:
def create_loader(transform):
    transform_dataset = ImageFolder(
        root='../data/flower_photos',
        transform=transform
    )
    transforms_loader = DataLoader(
        dataset=transform_dataset,
        batch_size=1000,
        shuffle=True
    )

    transform_img, transform_label = next(iter(transforms_loader))

    return transform_img, transform_label

In [None]:
img_transform = transforms.Compose(
    [
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        transforms.ColorJitter(
            brightness=(0.7, 0.9),
            contrast=(1, 1),
            saturation=(0.7, 0.9),
            hue=(-0.2, 0.2)   # 이건 색깔값 자체를 바꾸는 기능이라 잘 안씀
        ),
        transforms.ToTensor()
    ]
)
color_tf_image, color_tf_label = create_loader(img_transform)
plot(color_tf_image[:25], color_tf_label[:25])

In [None]:
x_trans2 = torch.cat([x_trans, color_tf_image], dim=0)
y_trans2 = torch.cat([y_trans, color_tf_label], dim=0)
print(x_trans2.size(), y_trans2.size())

In [None]:
img_transform = transforms.Compose(
    [
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        transforms.RandomHorizontalFlip(p=0.8), # 이미지 뒤집기
        transforms.ToTensor()
    ]
)

flip_tf_image, flip_tf_label = create_loader(img_transform)

x_trans3 = torch.cat([x_trans2, flip_tf_image], dim=0)
y_trans3 = torch.cat([y_trans2, flip_tf_label], dim=0)

print(x_trans3.size(), y_trans3.size())

In [None]:
img_transform = transforms.Compose(
    [
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        transforms.RandomRotation(   # 이미지 회전시키기
            degrees=(-15, 15),
            interpolation=transforms.InterpolationMode.BILINEAR,
            fill=0
        ),
        transforms.ToTensor()
    ]
)

rot_tf_image, rot_tf_label = create_loader(img_transform)

x_trans4 = torch.cat([x_trans3, rot_tf_image], dim=0)
y_trans4 = torch.cat([y_trans3, rot_tf_label], dim=0)

print(x_trans4.size(), y_trans4.size())

In [None]:
cnt0 = (y == 0).sum()
cnt1 = (y == 1).sum()
cnt2 = (y == 2).sum()
cnt3 = (y == 3).sum()
cnt4 = (y == 4).sum()
print(cnt0.item())
print(cnt1.item())
print(cnt2.item())
print(cnt3.item())
print(cnt4.item())

In [None]:
ratio = [0.9, 0.1]
train_cnt = int(y.size(0) * ratio[0])
test_cnt = int(y.size(0) * ratio[1])
cnts = [train_cnt, test_cnt]
print(train_cnt)
print(test_cnt)

In [None]:
indices = torch.randperm(x.size(0))
print(indices)

In [None]:
x = torch.index_select(x, dim=0, index=indices)
y = torch.index_select(y, dim=0, index=indices)

In [None]:
x_train = x[:cnts[0]]
x_test = x[cnts[0]:]
y_train = y[:cnts[0]]
y_test = y[cnts[0]:]

print(x_train.size(), y_train.size())
print(x_test.size(), y_test.size())

In [None]:
train_dataset = TensorDataset(x_train, y_train) # 데이터 셋 생성
test_dataset = TensorDataset(x_test, y_test)
print(train_dataset, test_dataset)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=100, shuffle=True)
test_loader = DataLoader(train_dataset, batch_size=100, shuffle=True)

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 8, 3, stride=1, padding=1),   # 패딩이 1이면 컨볼루션 때는 해상도가 안 줄고 풀링할때만 줄어듬
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(8, 16, 3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.layer3 = nn.Sequential(
            nn.Conv2d(16, 32, 3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.layer4 = nn.Sequential(
            nn.Conv2d(32, 64, 3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.layer5 = nn.Sequential(
            nn.Conv2d(64, 128, 3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.fc1 = nn.Linear(128*4*4, 128*4)
        self.fc2 = nn.Linear(128*4, 128)
        self.fc3 = nn.Linear(128, 32)
        self.fc4 = nn.Linear(32, 5)

    def forward(self, x):
        x = self.layer5(self.layer4(self.layer3(self.layer2(self.layer1(x)))))
        x = self.fc4(self.fc3(self.fc2(self.fc1(x.view(x.size(0), -1)))))
        return x

In [None]:
model = CNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())


In [None]:
def train(model, train_loader, optimizer):
    model.train()   # 모델 훈련 (미분 함)

    for batch_index, (img, label) in enumerate(train_loader):
        img = img.to(device)
        label = label.to(device)

        y_hat = model(img)
        loss = criterion(y_hat, label).to(device)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch_index % 10 == 0:
            print(f'Train Epoch: {epoch} | Train Loss: {loss.item()}')

In [None]:
test_loss = 0
test_accuracy = 0

def evaluate(model, test_loader):
    model.eval()    # 모델 검증 (미분 안함)

    with torch.no_grad():
        for batch_index, (img, label) in enumerate(test_loader):
            img = img.to(device)
            label = label.to(device)

            y_hat = model(img)
            test_loss += criterion(y_hat, label).to(device).item()
            pred = y_hat.argmax(dim=1)
            test_accuracy += (pred == label).sum()
        test_loss /= len(test_loader.dataset)
        test_accuracy = (test_accuracy.float() / len(test_loader.dataset)) * 100.0


    return test_loss, test_accuracy

In [None]:
epochs = 200
early_stop = 10
lowest_loss = np.inf
lowest_epoch = np.inf
best_model = None
path = '../data/model.pt'

In [None]:
for epoch in range(epochs + 1):
    train(model, train_loader, optimizer)
    test_loss, test_accuracy = evaluate(model, test_loader)
    print(f'Epoch: {epoch} | Test Loss: {test_loss} | Test Accuracy: {test_accuracy} | 최소 비용: {lowest_loss}')

    if test_accuracy == 100:
        print(f'Epoch {epoch}에서 최소 비용: {lowest_loss} (Early Stop = 10)')
        break

    if test_loss < lowest_loss:
        lowest_loss = test_loss
        lowest_epoch = epoch
        torch.save(model.state_dict(), path)
    else:
        if lowest_epoch + early_stop < epoch:
            print(f'Epoch {epoch}에서 최소 비용: {lowest_loss} (Early Stop = 10)')
            break

print('End of training.')

In [None]:
best_model = CNN().to(device)
best_model.load_state_dict(torch.load(path))
best_model.eval()

In [None]:
prediction = model(x_test[:10].to(device)).argmax(dim=-1)
print(prediction, y_test[:10])

In [None]:
plot(x_test[:25], prediction[:25])

In [None]:
test_data = ImageFolder(
    root='../data/test_set',
    transform=transforms.Compose(
        [
            transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
            transforms.ToTensor()
        ]
    ),
)

test_load = DataLoader(
    test_data,
    batch_size=10,
    shuffle=False
)

In [None]:
test_img, test_labels = next(iter(test_load))
print(test_img.size(), test_labels.size())

In [None]:
pred2 = best_model(test_img.to(device)).argmax(dim=-1)
print(pred2)

In [None]:
def plot2(x, y):
    plt.figure(figsize=(8, 4))
    for i in range(10):
        plt.subplot(2, 5, i+1)
        plt.imshow(x[i].permit(1, 2, 0))
        plt.title(y[i].item())
        plt.axis('off')
    plt.show()

plot2(test_img, test_labels)