In [None]:
import multiprocessing
import os

import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

In [None]:
DATA_PATH = "/home/wangc21/datasets/pool"

In [None]:
class PoolDataset(torch.utils.data.Dataset):
    def __init__(self, data_path, transform = None):
        self.data_path = data_path
        self.labels = # TODO
        self.transform = transform
        
        
    def __len__(self):
        return len(self.labels)
    
    
    def __getitem__(self, idx):
        # TODO

In [None]:
# data augmentation

train_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(degrees = 360),
    transforms.ColorJitter(brightness = 0.2, contrast = 0.1, saturation = 0.1),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

test_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

data_train = PoolDataset(os.path.join(DATA_PATH, "train"), transform = train_transforms)
print(len(data_train))
data_test = PoolDataset(os.path.join(DATA_PATH, "test"), transform = test_transforms)
print(len(data_test))

In [None]:
class Darknet(nn.Module):
    def __init__(self):
        super(Darknet, self).__init__()
        self.best_loss = np.inf
        
        self.conv1 = nn.Conv2d(3, 16, 3, 1, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1, 1)
        self.conv3 = nn.Conv2d(32, 64, 3, 1, 1)
        self.conv4 = nn.Conv2d(64, 128, 3, 1, 1)
        self.conv5 = nn.Conv2d(128, 256, 3, 1, 1)
        self.conv6 = nn.Conv2d(256, 512, 3, 1, 1)
        self.conv7 = nn.Conv2d(512, 1024, 3, 1, 1)
        self.fc = nn.Linear(5 * 3 * 1024, 16)
        
        
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv4(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv5(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv6(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv7(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 5 * 3 * 1024)
        x = self.fc(x)
        return x
    
    
    def loss(self, prediction, label):
        loss_fn = nn.CrossEntropyLoss()
        loss = loss_fn(prediction, label)
        return loss

In [None]:
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    losses = []
    
    # stochastic gradient descent
    for batch_idx, (data, label) in enumerate(train_loader):
        data, label = data.to(device), label.to(device)
        
        # reset gradients
        optimizer.zero_grad()
        
        # make prediction
        output = model(data)
        
        # compute error gradients
        loss = model.loss(output, label)
        losses.append(loss.item())
        loss.backward()
        
        # update weights
        optimizer.step()
        
        if batch_idx % 100 == 0:
            print('{} Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                time.ctime(time.time()),
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
    
    return np.mean(losses)


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for batch_idx, (data, label) in enumerate(test_loader):
            data, label = data.to(device), label.to(device)
            output = model(data)
            test_loss += model.loss(output, label, reduction='sum').item()
    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    
    return test_loss

In [None]:
# hyperparameters
EPOCHS = 100
TRAIN_BATCH_SIZE = 
TEST_BATCH_SIZE = 
LEARNING_RATE = 0.001
WEIGHT_DECAY = 0.0005

# hardware device
use_cuda = not args.no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
kwargs = {'num_workers': multiprocessing.cpu_count(),
          'pin_memory': True} if use_cuda else {}

# data loaders
train_loader = torch.utils.data.DataLoader(data_train, batch_size = TRAIN_BATCH_SIZE,
                                           shuffle = True, **kwargs)
test_loader = torch.utils.data.DataLoader(data_test, batch_size = TEST_BATCH_SIZE,
                                          shuffle = False, **kwargs)  

# training objects
model = Darknet().to(device)
optimizer = optim.Adam(model.parameters(), lr = LEARNING_RATE, weight_decay = WEIGHT_DECAY)

# training loop
train_loss = []
test_loss = []
for epoch in range(1, EPOCHS + 1):
    train_l = train(model, device, train_loader, optimizer, epoch)
    test_l = test(model, device, test_loader)
    train_loss.append(train_l)
    test_loss.append(test_l)
    
    # save best model
    if test_l < model.best_loss:
        model.best_loss = test_l
        torch.save(model.state_dict(), "epoch_%d.pt" % epoch)