In [1]:
import pandas as pd
import numpy as np
import os
import torch
from torch import nn
from torch.utils.data import DataLoader,Dataset
from tqdm.auto import tqdm
from sklearn.preprocessing import OneHotEncoder
from torchvision.datasets import DatasetFolder
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt


## Data Import 

In [2]:
train_tfm = transforms.Compose([
    transforms.RandomResizedCrop((128, 128)),
    transforms.RandomChoice(
        [transforms.AutoAugment(),
        transforms.AutoAugment(transforms.AutoAugmentPolicy.CIFAR10),
        transforms.AutoAugment(transforms.AutoAugmentPolicy.SVHN)]
    ),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.5),
    transforms.RandomRotation(5),
    transforms.ToTensor(),
])

test_tfm = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
])

In [3]:
train_set = DatasetFolder("data/food-11/training/labeled", loader=lambda x: Image.open(x), extensions="jpg", transform=train_tfm)
valid_set = DatasetFolder("data/food-11/validation", loader=lambda x: Image.open(x), extensions="jpg", transform=test_tfm)
unlabeled_set = DatasetFolder("data/food-11/training/unlabeled", loader=lambda x: Image.open(x), extensions="jpg", transform=train_tfm)
test_set = DatasetFolder("data/food-11/testing", loader=lambda x: Image.open(x), extensions="jpg", transform=test_tfm)

In [4]:
batch_size = 32
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

In [5]:
train_set[0][0].shape

torch.Size([3, 128, 128])

## Net

In [6]:
class Classifier(nn.Module):
    def __init__(self,test_mode=False):
        self.test_mode = test_mode
        super(Classifier, self).__init__()
        # The arguments for commonly used modules:
        # torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        # torch.nn.MaxPool2d(kernel_size, stride, padding)

        # input image size: [3, 128, 128]
        self.cnn_layers = nn.Sequential(
            nn.Conv2d(3, 64, 3, 1, 1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),

            nn.Conv2d(64, 128, 3, 1, 1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),

            nn.Conv2d(128, 256, 3, 1, 1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(4, 4, 0),
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(256 * 8 * 8, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 11)
        )

    def forward(self, x):
        # input (x): [batch_size, 3, 128, 128]
        # output: [batch_size, 11]

        # Extract features by convolutional layers.
        x = self.cnn_layers(x)
        
        if self.test_mode:
            
            print(x.shape)
            
        else:

            # The extracted feature map must be flatten before going to fully-connected layers.
            x = x.flatten(1)
            
            # The features are transformed by fully-connected layers to obtain the final logits.
            x = self.fc_layers(x)
            return x

device = "cuda" if torch.cuda.is_available() else "cpu"

model = Classifier().to(device)
#model.load_state_dict(torch.load('HW3-parameter/HW3-2.pth'))

## Train

In [7]:
optimizer = torch.optim.RAdam(model.parameters(), lr=0.0001, weight_decay=1e-5)
loss_f = nn.CrossEntropyLoss()
def train(model,train_loader,optimizer,loss_f,device):
    print(f'TRANING......')

    loss_li = []
    acc_li = []
    train_ = tqdm(train_loader)
    
    model.train()
    for idx,data in enumerate(train_):
        x,y = data
        x,y = x.to(device),y.to(device)
        
        optimizer.zero_grad()
        y_ = model(x)
        loss = loss_f(y_,y)
        loss.backward()
        grad_norm = nn.utils.clip_grad_norm_(model.parameters(), max_norm=10)
        optimizer.step()
        
        acc = (y_.argmax(dim=-1) == y.to(device)).float().mean()

        loss_li.append(loss.item())
        acc_li.append(acc)
        
        train_.set_description(f'[{idx+1}/{len(train_)+1}]')
        
    print(f'MEAN LOSS:{sum(loss_li)/len(loss_li)}\nMEAN ACC:{sum(acc_li)/len(acc_li)}')
    return sum(acc_li)/len(acc_li)
    
def vaild(model,vaild_loader,optimizer,loss_f,device):
    print(f'VAILDING......')

    loss_li = []
    acc_li = []
    vaild_ = tqdm(vaild_loader,ncols=70,leave=False)
    
    model.eval()
    for idx,data in enumerate(vaild_):
        x,y = data
        with torch.no_grad():
        
            x,y = x.to(device),y.to(device)
            y_ = model(x)
            loss = loss_f(y_,y)

            acc = (y_.argmax(dim=-1) == y.to(device)).float().mean()

            loss_li.append(loss.item())
            acc_li.append(acc)

            vaild_.set_description(f'[{idx+1}/{len(vaild_)+1}]')
    print(f'MEAN LOSS:{sum(loss_li)/len(loss_li)}\nMEAN ACC:{sum(acc_li)/len(acc_li)}')
    return sum(acc_li)/len(acc_li)

In [8]:
t = True
if t :
    test_model = Classifier(test_mode=True)
    for x,y in train_set:
        test_model(x.unsqueeze(0))
        break

torch.Size([1, 256, 8, 8])


In [9]:
epoch = 200
x = [i for i in range(1,epoch+1)]
train_acc_li = []
valid_acc_li = []
for i in range(epoch):
    print(f'[EPOCH:{i+1} --------NOW]')
    train_acc = train(model,train_loader,optimizer,loss_f,device)
    valid_acc = vaild(model,valid_loader,optimizer,loss_f,device)
    train_acc_li.append(train_acc.to('cpu'))
    valid_acc_li.append(valid_acc.to('cpu'))
plt.plot(x,train_acc_li,'r--')
plt.plot(x,valid_acc_li,'g-')
plt.show()

[EPOCH:1 --------NOW]
TRANING......


  0%|          | 0/97 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
torch.save(model.state_dict(),'HW3-parameter/HW3-final.pth')
print('saved')