#### Imports, prepocessing functions and login to wandb

In [2]:
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR

import os
import numpy as np
import pandas as pd
from skimage import feature as skif
import cv2
from sklearn.metrics import classification_report
from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split

from google.colab import files
from google.colab import drive
from zipfile import ZipFile

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [3]:
!pip install wandb -Uq

[K     |████████████████████████████████| 1.8 MB 4.4 MB/s 
[K     |████████████████████████████████| 158 kB 62.3 MB/s 
[K     |████████████████████████████████| 181 kB 53.7 MB/s 
[K     |████████████████████████████████| 63 kB 1.5 MB/s 
[K     |████████████████████████████████| 157 kB 61.9 MB/s 
[K     |████████████████████████████████| 157 kB 71.3 MB/s 
[K     |████████████████████████████████| 157 kB 72.6 MB/s 
[K     |████████████████████████████████| 157 kB 71.5 MB/s 
[K     |████████████████████████████████| 157 kB 68.3 MB/s 
[K     |████████████████████████████████| 157 kB 70.0 MB/s 
[K     |████████████████████████████████| 157 kB 71.1 MB/s 
[K     |████████████████████████████████| 156 kB 66.6 MB/s 
[?25h  Building wheel for pathtools (setup.py) ... [?25l[?25hdone


In [4]:
import wandb

wandb.login()

ERROR:wandb.jupyter:Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.


<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


True

In [5]:
def create_partition_and_labels(data):
    """
    creates 2 dictionary like follow:

    >>> partition
    {'train': ['path-1', 'path-2', 'path-3'], 'validation': ['path-4']}

    >>> ID_labels
    {'path-1': 0, 'path-2': 1, 'path-3': 2, 'path-4': 1}
    """
    train = []
    train_label = []
    test = []
    test_label = []

    # test data
    for folder in os.listdir(f'/content/{data}/test'):
        if os.path.exists(f'/content/{data}/test/'+folder+'/live'):
            for image in os.listdir(f'/content/{data}/test/'+folder+'/live'):
                test.append(f'/content/{data}/test/'+folder+'/live/'+image)
                test_label.append(0)
        if os.path.exists(f'/content/{data}/test/'+folder+'/spoof'):
            for image in os.listdir(f'/content/{data}/test/'+folder+'/spoof'):
                test.append(f'/content/{data}/test/'+folder+'/spoof/'+image)
                test_label.append(1)
    
    # train data
    for folder in os.listdir(f'/content/{data}/train'):
        if os.path.exists(f'/content/{data}/train/'+folder+'/live'):
            for image in os.listdir(f'/content/{data}/train/'+folder+'/live'):
                train.append(f'/content/{data}/train/'+folder+'/live/'+image)
                train_label.append(0)
        if os.path.exists(f'/content/{data}/train/'+folder+'/spoof'):
            for image in os.listdir(f'/content/{data}/train/'+folder+'/spoof'):
                train.append(f'/content/{data}/train/'+folder+'/spoof/'+image)
                train_label.append(1)
    
    # train test valid split
    train, valid, train_label, valid_label = train_test_split(train, train_label, 
                                                              train_size = 0.8, shuffle=True, random_state=44)
    partition = {'test': test, 'train': train, 'valid': valid}

    # path <-> image label
    labels_list = test_label + train_label + valid_label
    path_list = test + train + valid
    path_labels = dict(zip(path_list, labels_list))

    return partition, path_labels

In [6]:
class MyDataset(torch.utils.data.Dataset):
  def __init__(self, path_list, id_labels):
        self.path_list = path_list
        self.id_labels = id_labels

  def __len__(self):
        return len(self.path_list)

  def __getitem__(self, index):
        def lbp_histogram(image,P=8,R=1,method = 'nri_uniform'):
            '''
            image: shape is N*M 
            '''
            lbp = skif.local_binary_pattern(image, P, R, method)
            max_bins = int(lbp.max() + 1)
            hist,_= np.histogram(lbp, density=True, bins=max_bins, range=(0, max_bins))
            return hist

        def get_features(file_name):
            image = cv2.imread(file_name)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2YCrCb)
            y_h = lbp_histogram(image[:,:,0]) # y channel
            cb_h = lbp_histogram(image[:,:,1]) # cb channel
            cr_h = lbp_histogram(image[:,:,2]) # cr channel
            feature = np.concatenate((y_h,cb_h,cr_h))
            return feature
        def padding(image_vector, to_size = 177):
            if len(image_vector) < to_size:
                image_vector = np.append(image_vector, np.zeros(to_size-len(image_vector)))
            return image_vector

        image_path = self.path_list[index]
        image_vector = get_features(image_path)

        X = padding(image_vector)
        y = self.id_labels[image_path]

        return torch.Tensor(X), torch.tensor(y, dtype=torch.long)

### Dataset

In [9]:
drive.mount('/content/gdrive')

DATASET_NAME = 'live'
with ZipFile(f'/content/gdrive/MyDrive/{DATASET_NAME}.zip', 'r') as dataset_zip:
    dataset_zip.extractall('/content')

DATASET_NAME_2 = 'replay_attack'
with ZipFile(f'/content/gdrive/MyDrive/{DATASET_NAME_2}.zip', 'r') as dataset_zip:
    dataset_zip.extractall('/content')

DATASET_NAME_3 = 'print_attack'
with ZipFile(f'/content/gdrive/MyDrive/{DATASET_NAME_3}.zip', 'r') as dataset_zip:
    dataset_zip.extractall('/content')

try: ### IF TO TRAIN ###

    # get list with path to images and their labels
    partition, path_labels = create_partition_and_labels(data = DATASET_NAME)

    # to torch format
    training_set = MyDataset(partition['train'], path_labels)
    validation_set = MyDataset(partition['valid'], path_labels)
    testing_set = MyDataset(partition['test'], path_labels)

except: ### IF ONLY TO TEST ###

    test_list = []

    # live
    for image in sorted(os.listdir(DATASET_NAME)):
        test_list.append(f'{DATASET_NAME}/{image}')
    len_live = len(test_list)

    # replay
    for image in sorted(os.listdir(DATASET_NAME_2)):
        test_list.append(f'{DATASET_NAME_2}/{image}')

    # print
    for image in sorted(os.listdir(DATASET_NAME_3)):
        test_list.append(f'{DATASET_NAME_3}/{image}')
    
    labels = np.append(np.zeros(len_live), np.ones(len(test_list)-len_live))
    path_labels = dict(zip(test_list, labels))
    testing_set = MyDataset(test_list, path_labels)
    training_set = None
    validation_set = None

Mounted at /content/gdrive


In [30]:
TOPREDICT = 'to_predict'
# with ZipFile(f'/content/gdrive/MyDrive/{TOPREDICT}.zip', 'r') as dataset_zip:
#     dataset_zip.extractall('/content')

# to predict
predict_list = []
for image in sorted(os.listdir(TOPREDICT)):
    predict_list.append(f'{TOPREDICT}/{image}')

labels = np.array([0, 1, 1])
path_labels = dict(zip(predict_list, labels))
predict_set = MyDataset(predict_list, path_labels)

### Functions to train, test and log

In [31]:
def trainer(model, train_loader, valid_loader, loss_function, optimizer, scheduler, config):
    """
    (count_of_epoch, batch_size, dataset, model, loss_function, optimizer, lr = 0.001)
    trainer итерируется по кол-ву эпох и вызывает функцию train_epoch
    count_of_epoch - кол-во эпох
    batch_size - размер батча
    dataset - данные для обучения
    model - модель нейронной сети
    loss_function - функция потерь
    optimizer - оптимизатор
    lr - скорость обучения, по умолчанию 0.001
    """
    min_valid_loss = np.inf

    # # in this foulder will save model weights
    if not os.path.exists('/content/model_weights'):
        os.mkdir('/content/model_weights')

    # Tell wandb to watch what the model gets up to: gradients, weights, and more!
    wandb.watch(model, loss_function, log="all", log_freq=10)
    
    for e in range(config.count_of_epoch):
        # train
        epoch_loss = train_epoch(train_generator=train_loader, 
                    model=model, 
                    loss_function=loss_function, 
                    optimizer=optimizer)

        # valid
        valid_loss = 0.0
        model.eval()
        valid_loss = train_epoch(train_generator=valid_loader, 
                    model=model, 
                    loss_function=loss_function, 
                    optimizer=optimizer)
        
        scheduler.step()

        # log things
        trainer_log(epoch_loss, valid_loss, e, scheduler.get_last_lr()[0], min_valid_loss)

        # saving models
        if min_valid_loss > valid_loss:
            print(f'Validation Loss Decreased({min_valid_loss:.6f}--->{valid_loss:.6f}) \t Saving The Model')
            min_valid_loss = valid_loss
            torch.save(model.state_dict(), f'/content/model_weights/saved_model_{e}.pth')
            wandb.log_artifact(f'/content/model_weights/saved_model_{e}.pth', 
                               name=f'saved_model_{e}', type='model')
        print()

def train_epoch(train_generator, model, loss_function, optimizer):
    """
    внутри train_epoch итерируемся по батчам внутри батчгенератора
    train_generator - батчгенератора
    model - модель нейронной сети
    loss_function - функция потерь
    optimizer - оптимизатор
    """
    epoch_loss = 0
    total = 0
    for it, (batch_of_x, batch_of_y) in enumerate(train_generator):
        batch_loss = train_on_batch(model, batch_of_x, batch_of_y, optimizer, loss_function)
            
        epoch_loss += batch_loss*len(batch_of_x)
        total += len(batch_of_x)
        #
        print(total)
        #
    
    return epoch_loss/total

def train_on_batch(model, x_batch, y_batch, optimizer, loss_function):
    """
    в train_on_batch обучаемся на одном батче
    model - модель нейронной сети
    x_batch - фичи
    y_batch - таргеты(метки классов)
    optimizer - оптимизатор
    loss_function - функция потерь
    """
    model.train()
    optimizer.zero_grad()
    
    output = model(x_batch.to(device))
    
    loss = loss_function(output, y_batch.to(device))
    loss.backward()

    optimizer.step()
    return loss.cpu().item()

def tester(model, test_loader):
    pred = []
    real = [] 
    model.eval()
    for it, (x_batch, y_batch) in enumerate(test_loader):
        x_batch = x_batch.to(device)
        with torch.no_grad():
            output = model(x_batch)

        pred.extend(torch.argmax(output, dim=-1).cpu().numpy().tolist())
        real.extend(y_batch.cpu().numpy().tolist())

    wandb.log({"test_accuracy": accuracy_score(real, pred)})

    print(classification_report(real, pred, zero_division = 0))

    print(confusion_matrix(real, pred))

def predicter(model, predict_loader):
    pred = []
    real = []
    model.eval()
    for it, (x_batch, y_batch) in enumerate(predict_loader):
        x_batch = x_batch.to(device)
        with torch.no_grad():
            output = model(x_batch)

        pred.extend(torch.argmax(output, dim=-1).cpu().numpy().tolist())
        real.extend(y_batch.cpu().numpy().tolist())

    np.save('predicted_labels.npy', np.array(pred))
    print(pred[:10], '- predicted')
    print(real[:10], '- real label')

def trainer_log(train_loss, valid_loss, epoch, lr, min_val_loss):
    wandb.log({'train_loss': train_loss, 'valid_loss': valid_loss,
               'epoch': epoch, 'learning_rate': lr,
               'min_validation_loss': min_val_loss})
    print(f'train loss on {str(epoch).zfill(3)} epoch: {train_loss:.6f} with lr: {lr:.6f}')
    print(f'valid loss on {str(epoch).zfill(3)} epoch: {valid_loss:.6f}')

def make_loader(dataset, batch_size):
    loader = torch.utils.data.DataLoader(dataset=dataset,
                                         batch_size=batch_size, 
                                         shuffle=True,
                                         pin_memory=True, num_workers=2)
    return loader

def download_folder_in_zip(dir_to_zip, output_filename, delete_dir_after_download=False):
    os.system( "zip -r {} {}".format(output_filename, dir_to_zip))
    if delete_dir_after_download:
        os.system( "rm -r {}".format(dir_to_zip))
    files.download(output_filename)

### Model pipeline

In [32]:
def pipeline(hyperparameters, saved_model=None, to_train=True, to_test=True, to_predict=False):

    with wandb.init(project=hyperparameters['project'], config=hyperparameters) as run:
      config = wandb.config
      
      # build the model
      model = build_model(run, config, saved_model)

      # make the data and optimization 
      train_loader, valid_loader, test_loader, predict_loader, criterion, optimizer, scheduler = make(model, config)

      print('config:', '\n', config, '\n', model, '\n', 'running on device:', device, '\n')

      if to_train:
        trainer(model, train_loader, valid_loader, criterion, optimizer, scheduler, config)

      if to_test:
        tester(model, test_loader)

      if to_predict:
          predicter(model, predict_loader)
    return model

def build_model(run, config, saved_model=None):
    IN, H1, H2, H3, H4, H5, OUT = 177, 512, 256, 128, 64, 32, 2
    p = config.dropout

    model =  nn.Sequential(
    nn.Linear(IN, H1), nn.Dropout(p), nn.BatchNorm1d(H1), nn.ReLU(),        
    nn.Linear(H1, H2), nn.Dropout(p), nn.BatchNorm1d(H2), nn.ReLU(), 
    nn.Linear(H2, H3), nn.Dropout(p), nn.BatchNorm1d(H3), nn.ReLU(), 
    nn.Linear(H3, H4), nn.Dropout(p), nn.BatchNorm1d(H4), nn.ReLU(), 
    nn.Linear(H4, H5), nn.Dropout(p), nn.BatchNorm1d(H5), nn.ReLU(), 
    nn.Linear(H5, OUT), nn.Dropout(p), nn.BatchNorm1d(OUT), nn.ReLU()) 

    if saved_model == None:
        # just init with some weights
        def init_weights(m):
            if type(m) == nn.Linear:
                torch.nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
        model.apply(init_weights)

    else:
        # download weights of saved model 
        # artifact = run.use_artifact(f'lanit-summer2022-antispoofing/{config.project}/{saved_model[0]}:v{saved_model[1]}', type='model')
        # artifact_dir = artifact.download()
        # model_path = os.path.join(artifact_dir, f'{saved_model[0]}.pth')

        #
        model_path = saved_model
        #
        model.load_state_dict(torch.load(model_path, map_location=torch.device(device)))

    model = model.to(device)

    return model

def make(model, config):
    if training_set is not None: # if to train and test
        train_loader = make_loader(training_set, batch_size=config.batch_size)
        valid_loader = make_loader(validation_set, batch_size=config.batch_size)
        test_loader = make_loader(testing_set, batch_size=config.batch_size)
        predict_loader = make_loader(predict_set, batch_size=config.batch_size)

    else:  # if only to test
        train_loader = None
        valid_loader = None
        test_loader = make_loader(testing_set, batch_size=config.batch_size)
        predict_loader = make_loader(predict_set, batch_size=config.batch_size)
    

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=config.lr)
    scheduler = StepLR(optimizer, config.step_size, config.step_gamma)
    
    return train_loader, valid_loader, test_loader, predict_loader, criterion, optimizer, scheduler

### Runnning

In [22]:
drive.mount('/content/gdrive')
saved_model = '/content/gdrive/MyDrive/saved_model_128_256.pth'

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [33]:
config = dict(count_of_epoch=1, batch_size=1, lr=10, 
              dropout=0.0, critirion='CrossEntropyLoss', 
              optimizer='SGD', scheduler='StepLR', 
              step_size = 10, step_gamma = 0.5,
              project='testing', name_of_model='mlp')


model = pipeline(config, saved_model=saved_model, to_train=False, to_test=False, to_predict=True)

config: 
 {'count_of_epoch': 1, 'batch_size': 1, 'lr': 10, 'dropout': 0.0, 'critirion': 'CrossEntropyLoss', 'optimizer': 'SGD', 'scheduler': 'StepLR', 'step_size': 10, 'step_gamma': 0.5, 'project': 'testing', 'name_of_model': 'mlp'} 
 Sequential(
  (0): Linear(in_features=177, out_features=512, bias=True)
  (1): Dropout(p=0.0, inplace=False)
  (2): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (3): ReLU()
  (4): Linear(in_features=512, out_features=256, bias=True)
  (5): Dropout(p=0.0, inplace=False)
  (6): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (7): ReLU()
  (8): Linear(in_features=256, out_features=128, bias=True)
  (9): Dropout(p=0.0, inplace=False)
  (10): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (11): ReLU()
  (12): Linear(in_features=128, out_features=64, bias=True)
  (13): Dropout(p=0.0, inplace=False)
  (14): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=

VBox(children=(Label(value='0.000 MB of 0.000 MB uploaded (0.000 MB deduped)\r'), FloatProgress(value=1.0, max…