In [None]:
import time
import torch
import numpy as np
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
from tqdm.notebook import tqdm
import torch.optim as optim
import glob
from PIL import Image

verbose = True
mode = "actual"
cuda = torch.cuda.is_available()
num_workers = 4 if cuda else 0 
device = torch.device("cuda" if cuda else "cpu")

In [None]:
if verbose:
    print("mode: %s" % mode)
    print("torch version: %s" % torch.__version__)
    print("np version: %s" % np.__version__)
    print("cuda: %s" % cuda)
    print("num_workers: %s" % num_workers)
    print("device: %s" % device)
    print("verbose: %s" % verbose)

In [None]:
root = "../data/"

pred_filename = "test_pred.csv"
dataset_cat = "medium"

eval_cls = root + "validation_classification/" + dataset_cat
test_cls = root + "test_classification/" + "medium"
if (mode=="development"):
    train_cls = eval_cls # for development
else:
    train_cls = root + "train_data/" + dataset_cat # for actual training

eval_vrf = root + "validation_verification"
test_vrf = root + "test_verification"

test_cls_order_path = root + "test_order_classification.txt"

In [None]:
# load dataset
if (verbose):
    print("loading dataset...")

class testDataset(Dataset):
    def __init__(self, test_path, transform, test_cls_order_path):
        super().__init__()

        self.test_path = test_path
        self.transform = transform
        
        # load image order file
        self.image_order_list = np.loadtxt(test_cls_order_path, dtype=str)

    def __len__(self):
        return len(self.image_order_list)
      
    def __getitem__(self, index):
        image_rel_path = self.image_order_list[index]
        image_path = self.test_path + "/" + image_rel_path
        image = Image.open(image_path)
        test_single_data = self.transform(image)
        
        return test_single_data

# TODO: may need to normalize images
transformations = transforms.Compose([
    transforms.ToTensor()
])

train_dataset = datasets.ImageFolder(root = train_cls, transform=transformations)
eval_dataset = datasets.ImageFolder(root = eval_cls, transform=transformations)
test_dataset = testDataset(test_cls, transforms.ToTensor(), test_cls_order_path)

In [None]:
# hyper-parameters
input_shape = torch.Size([3, 32, 32])
num_faceids = len(train_dataset.classes)

lr = 1e-3 # default lr is 1e-3
epochs = 20
batch_size = 256
embedding_dim = 128 # How to use this?

In [None]:
train_loader = DataLoader(
    train_dataset,              # The dataset
    batch_size=batch_size,      # Batch size
    shuffle=True,               # Shuffles the dataset at every epoch
    pin_memory=True,            # Copy data to CUDA pinned memory
    num_workers=num_workers     # Number of worker processes for loading data.
)

eval_loader = DataLoader(
    eval_dataset,
    batch_size=batch_size,
    shuffle=False,
    pin_memory=True,
    num_workers=num_workers
)

test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False,
    pin_memory=True,
    num_workers=num_workers
)

In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    running_loss = 0.0
    total_predictions = 0.0
    correct_predictions = 0.0

    start_time = time.time()

    for batch_idx, (data, target) in enumerate(tqdm(train_loader)):
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()   # .backward() accumulates gradients

        outputs = model(data)

        _, predicted = torch.max(outputs.data, 1)
        predicted.detach_()
        total_predictions += target.size(0)
        correct_predictions += (predicted == target).sum().item()

        loss = criterion(outputs, target)

        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    end_time = time.time()

    running_loss /= len(train_loader)
    acc = (correct_predictions / total_predictions) * 100.0
    print('Training Loss: ', running_loss, 'Time: ',end_time - start_time, 's')
    print('Training Accuracy: ', acc, '%')
    return running_loss

def evaluate_model(model, eval_loader, criterion, device):
    with torch.no_grad():
        model.eval()

        running_loss = 0.0
        total_predictions = 0.0
        correct_predictions = 0.0

        for batch_idx, (data, target) in enumerate(tqdm(eval_loader)):
            data = data.to(device)
            target = target.to(device)

            outputs = model(data)

            _, predicted = torch.max(outputs.data, 1)
            total_predictions += target.size(0)
            correct_predictions += (predicted == target).sum().item()

            loss = criterion(outputs, target).detach()
            running_loss += loss.item()

        running_loss /= len(eval_loader)
        acc = (correct_predictions/total_predictions)*100.0
        print('evaluate Loss: ', running_loss)
        print('evaluate Accuracy: ', acc, '%')
        return running_loss, acc

def test_model(model, test_loader, device, save=False, filename="../data/test_pred.csv"):
    predicts = torch.LongTensor().to(device)
    
    with torch.no_grad():
        model.eval()

        model.to(device)

        # no target in test dataset/data loader
        for batch_idx, data in enumerate(tqdm(test_loader)):
            data = data.to(device)

            outputs = model(data)

            _, predict = torch.max(outputs.data, 1)
            
            predicts = torch.cat([predicts, predict])
    
    assert predicts.shape[0] == len(test_loader.dataset)
    assert predicts.shape[0] == len(test_loader.dataset.image_order_list)
    
    predict_labels = []
    
    # convert label index back to real indentity label
    for i in predicts.detach().cpu().numpy():
        predict_labels.append(
            [key  for (key, value) in train_dataset.class_to_idx.items() if value == i][0])
    
    if save:
        result = np.concatenate([test_loader.dataset.image_order_list.reshape(-1, 1),
                                 np.asarray(predict_labels).reshape(-1, 1)], axis=1)
        np.savetxt(filename, result, fmt="%s", delimiter=",", header="Id,Category", comments="")
    
    return predicts

def train_model(model, epochs, train_loader, eval_loader, criterion, optimizer, device):

    model.to(device)
    model.train()

    for epoch in range(epochs+1):
        print("epoch: %d" % (epoch))
        
        train_loss = train_epoch(model, train_loader, criterion, optimizer, device=device)
        eval_loss, eval_acc = evaluate_model(model, eval_loader, criterion, device=device)
        
        print('=' * 20)
    
    return 

In [None]:
# define model
# TODO: add batchnorm
# TODO: how to define embedding from conv2d ?
class CNN(nn.Module):
    def __init__(self, input_shape, output_size, embedding_dim, device):
        super(CNN, self).__init__()        
        self.net = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=56, kernel_size=5, stride=1),
            nn.Tanh(),
            nn.Conv2d(in_channels=56, out_channels=28, kernel_size=6, stride=2),
            nn.ReLU(),
            nn.Conv2d(in_channels=28, out_channels=14, kernel_size=2, stride=2),
            nn.Sigmoid(),
            nn.Flatten()
        )
        
        # trick: infer linear input size
        linear_input_size = self._get_linear_input_size(input_shape, device)
        linear_module_key = str(len(self.net))
        
        self.net.add_module(linear_module_key, nn.Linear(linear_input_size, output_size))

    def _get_linear_input_size(self, input_shape, device):
        fake_input = torch.zeros((1, *input_shape)).to(device)
        self.net.to(device)
        fake_output = self.net(fake_input)
        assert len(fake_output.shape) == 2 # must be after flatten
        linear_input_size = fake_output.shape[1]
        return linear_input_size
        
    def forward(self, x):
        return self.net(x)

In [None]:
model = CNN(input_shape=input_shape,
            output_size=num_faceids,
            embedding_dim=embedding_dim,
            device=device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.to(device).parameters(), lr=lr)

In [None]:
# training
if verbose:
    print("training...")

train_model(model, epochs, train_loader, eval_loader,criterion,optimizer,device)

In [None]:
# predicting
if verbose:
    print("predicting...")

predicts = test_model(model, test_loader, device, save=True, filename=pred_filename)
print("finished")