In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset

## Data

### ReadFile

In [None]:
import os
import numpy as np
import cv2

def readfile(path, label):
    # label 是一個 boolean variable，代表需不需要回傳 y 值
    image_dir = sorted(os.listdir(path))
    x = np.zeros((len(image_dir), 128, 128, 3), dtype=np.uint8)
    y = np.zeros((len(image_dir)), dtype=np.uint8)
    
    for i, file in enumerate(image_dir):
        img = cv2.imread(os.path.join(path, file))
        x[i, :, :] = cv2.resize(img,(128, 128))
        if label:
            y[i] = int(file.split("_")[0])
    if label:
        return x, y
    else:
        return x

In [None]:
work_dir = "../../data/food-11/"
print("read file")

train_x, train_y = readfile(os.path.join(work_dir, "training"), True)
print("Size of train : {} ".format(len(train_x)))

val_x, val_y = readfile(os.path.join(work_dir, "validation"), True)
print("Size of validation : {} ".format(len(val_x)))

test_x = readfile(os.path.join(work_dir, "testing"), False)
print("Size of test : {}".format(len(test_x)))


### DataSet

In [None]:
class ImgDataSet(Dataset):
    def __init__(self, x, y = None, transform = None):
        self.x = x
        # label is required to be a LongTensor
        self.y = y
        if y is not None:
            self.y = torch.LongTensor(y)
        self.transform = transform
    
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, index):
        x = self.x[index]
        if self.transform is not None:
            x = self.transform(x)
        
        if self.y is not None:
            y = self.y[index]
            return x, y
        
        else:
            return x

In [None]:
angle = 30
train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(), # 随机翻转图片
#     transforms.RandomResizedCrop(),
    transforms.RandomRotation(angle),
    transforms.ToTensor()   
])

#  给test和val用
test_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor()
])

batch_size = 128
train_set = ImgDataSet(train_x, train_y, train_transform)
val_set = ImgDataSet(val_x, val_y, test_transform)
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)

## Model

In [None]:
class CNNClassifier(nn.Module):
    def __init__(self):
        super(CNNClassifier, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 64, 3, 1, 1),
            nn.BatchNorm2d(64),
             nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [64, 64, 64]

            nn.Conv2d(64, 128, 3, 1, 1), # [128, 64, 64]
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [128, 32, 32]

            nn.Conv2d(128, 256, 3, 1, 1), # [256, 32, 32]
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [256, 16, 16]

            nn.Conv2d(256, 512, 3, 1, 1), # [512, 16, 16]
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),       # [512, 8, 8]
            
            nn.Conv2d(512, 512, 3, 1, 1), # [512, 8, 8]
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),       # [512, 4, 4]
        )
        self.fc = nn.Sequential(
            nn.Linear(512*4*4, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 11)
        )
        
    def forward(self, x):
        out = self.cnn(x)
        out = out.view(out.size()[0], -1)
        return self.fc(out)

## Training

In [None]:
import time
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = CNNClassifier().to(device)
loss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

num_epoch = 5

for epoch in range(num_epoch):
    epoch_start_time = time.time()
    train_acc = 0.0
    train_loss = 0.0
    val_acc = 0.0
    val_loss = 0.0
    
    model.train()
    for i, data in enumerate(train_loader):
        optimizer.zero_grad()
        train_pred = model(data[0])
        batch_loss = loss(train_pred, data[1])
        batch_loss.backward()
        optimizer.step()
        
        # train_pred.data.numpy() [128, 11]
        train_acc += np.sum(np.argmax(train_pred.data.numpy(), axis = 1) == data[1].numpy())
        #print(train_acc)
        train_loss += batch_loss.item()
        #print(train_loss)
        
    model.eval()
    with torch.no_grad():
        for i, data in enumerate(val_loader):
            val_pred = model(data[0])
            batch_loss = loss(val_pred, data[1])
            
            val_acc += np.sum(np.argmax(val_pred.data.numpy(), axis = 1) == data[1].numpy())
            val_loss += batch_loss.item()
        
        print('[%03d/%03d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f | Val Acc: %3.6f loss: %3.6f' % \
            (epoch + 1, num_epoch, time.time()-epoch_start_time, \
             train_acc/train_set.__len__(), train_loss/train_set.__len__(), val_acc/val_set.__len__(), val_loss/val_set.__len__()))


In [None]:
# 训练完以后要把validation加进去再训练过
train_val_x = np.concatenate((train_x, val_x), axis=0)
train_val_y = np.concatenate((train_y, val_y), axis=0)
train_val_set = ImgDataSet(train_val_x, train_val_y, train_transform)
train_val_loader = DataLoader(train_val_set, batch_size=batch_size, shuffle=True)

### testing

In [None]:
test_set = ImgDataSet(test_x, transform = test_transform)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

prediction = []

model.eval()
with torch.no_grad():
    for i, data in enumerate(test_loader):
        test_pred = model(data[0])
        test_label = np.argmax(data[1].numpy(), axis = 1)
        for y in test_label:
            prediction.append(y)