# ID R&D antispoofing challenge baseline

Реализуем простой бейзлайн на основе предобученной сети ResNet18. Сеть будет делать предсказания для каждого кадра отдельно. Для получения предсказания для каждого видео будем усреднять покадровые вероятности.

Импортируем библиотеки.

In [None]:
%pylab inline

import copy
import os
import pickle
import torch
import torchvision

from sklearn.model_selection import train_test_split

## Подготовка данных

In [None]:
path_data = '../train_sample'

Можно обучать модели в Google Colab, а данные для них загружать из личного Google Drive.

In [None]:
# from google.colab import drive
# drive.mount('/content/drive/')

# path_data = '/content/drive/My Drive/idrnd/train'

Загрузим пути до всех изображений (сами изображения не будем держать в памяти, а будем загружать их при подготовке очередного батча).

In [None]:
path_images = []

for label in ['2dmask', 'real', 'printed', 'replay']:
    videos = os.listdir(os.path.join(path_data, label))
    for video in videos:
        frames = os.listdir(os.path.join(path_data, label, video))
        for frame in frames:
            path_images.append({
                'path': os.path.join(path_data, label, video, frame),
                'label': int(label != 'real'),
                'video': video})

Подготовим торчовый датасет для наших данных.

In [None]:
class AntispoofDataset(torch.utils.data.dataset.Dataset):
    def __init__(self, paths, transform=None,
                 loader=torchvision.datasets.folder.default_loader):
        self.paths = paths
        self.transform = transform
        self.loader = loader
    
    def __getitem__(self, index):
        image_info = self.paths[index]

        img = self.loader(image_info['path'])
        if self.transform is not None:
            img = self.transform(img)

        return (img, image_info['label'])

    def __len__(self):
        return len(self.paths)

Разделим выборку на обучающую и контрольную. Так как объектов у нашей модели выступает кадр из видео, то необходимо делить выборку так, чтобы кадры одного видео попадали либо целиком в обучающую выборку, либо целиком в тестовую выборку (иначе будем переобучаться).

In [None]:
test_fraction = 0.1

videos = list(set(x['video'] for x in path_images))
videos_tr, videos_ts = train_test_split(videos, test_size=0.1, random_state=123)

train_path_images = [x for x in path_images if x['video'] in videos_tr]
test_path_images = [x for x in path_images if x['video'] in videos_ts]

Подготовим генераторы для данных. Для обучения добавим немного аугментаций.

In [None]:
data_transforms = {
    'train': torchvision.transforms.Compose([
        torchvision.transforms.Resize(224),
        torchvision.transforms.RandomHorizontalFlip(),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize(
            [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
    'val': torchvision.transforms.Compose([
        torchvision.transforms.Resize(224),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize(
            [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),}

image_datasets = {
    'train': AntispoofDataset(
        train_path_images, transform=data_transforms['train']),
    'val': AntispoofDataset(
        test_path_images, transform=data_transforms['val'])}

dataloaders = {
  x: torch.utils.data.DataLoader(
        image_datasets[x], batch_size=256, shuffle=True, num_workers=4)
    for x in ['train', 'val']}

dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

## Обучение модели

Возьмём за основу предобученный ResNet18. Заменим последний полносвязный слой, так как нам необходимо предсказывать только 1 класс.

In [None]:
model = torchvision.models.resnet18(pretrained=True)
model.fc = torch.nn.Linear(model.fc.in_features, 1)
model = model.to(device)

Обучим сеть. Будем запоминать лучшую по точности модель на отложенной выборки.

In [None]:
num_epochs = 10

optimizer = torch.optim.Adam(model.parameters())
criterion = torch.nn.BCEWithLogitsLoss()

since = time.time()
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0

for epoch in range(num_epochs):
    print('Epoch {}/{}'.format(epoch, num_epochs - 1))
    print('-' * 10)

    # each epoch has a training and validation phase
    for phase in ['train', 'val']:
        if phase == 'train':
            model.train()
        else:
            model.eval()

        running_loss = 0.0
        running_corrects = 0

        # iterate over data
        for inputs, labels in dataloaders[phase]:
            inputs = inputs.to(device)
            labels = labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            with torch.set_grad_enabled(phase == 'train'):
                outputs = model(inputs).view(-1)
                preds = (outputs > 0).long()
                loss = criterion(outputs, labels.float())

                # backward + optimize only if in training phase
                if phase == 'train':
                    loss.backward()
                    optimizer.step()

            # statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / dataset_sizes[phase]
        epoch_acc = running_corrects.double() / dataset_sizes[phase]

        print('{}\tLoss: {:.4f}\tAcc: {:.4f}'.format(
            phase, epoch_loss, epoch_acc))

        # deep copy the model
        if phase == 'val' and epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = copy.deepcopy(model.state_dict())

    print()

time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
    time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))

# load best model weights
model.load_state_dict(best_model_wts)

Сохраним модель. Она нам пригодится в скрипте для инференса.

In [None]:
torch.save(model.state_dict(), 'weights.pt')