<a href="https://colab.research.google.com/github/AbGhost-cyber/AbGhost-cyber/blob/main/siamTest.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch


def accuracy(distances, y, step=0.01):
    min_threshold_d = min(distances)
    max_threshold_d = max(distances)
    max_acc = 0
    same_id = (y == 1)

    for threshold_d in torch.arange(min_threshold_d, max_threshold_d + step, step):
        true_positive = (distances <= threshold_d) & (same_id)
        true_positive_rate = true_positive.sum().float() / same_id.sum().float()
        true_negative = (distances > threshold_d) & (~same_id)
        true_negative_rate = true_negative.sum().float() / (~same_id).sum().float()

        acc = 0.5 * (true_negative_rate + true_positive_rate)
        max_acc = max(max_acc, acc)
    return max_acc

In [3]:
!unzip full_forg.zip

Archive:  full_forg.zip
  End-of-central-directory signature not found.  Either this file is not
  a zipfile, or it constitutes one disk of a multi-part archive.  In the
  latter case the central directory and zipfile comment will be found on
  the last disk(s) of this archive.
unzip:  cannot find zipfile directory in one of full_forg.zip or
        full_forg.zip.zip, and cannot find full_forg.zip.ZIP, period.


In [None]:
import os
import numpy as np
import pandas as pd
from PIL import Image
from torch.utils.data import Dataset, DataLoader

seed = 2020
np.random.seed(seed)


def get_data_loader(is_train, batch_size, image_transform, dataset='cedar'):
    if dataset == 'cedar':
        data_dir = './data/CEDAR'
    elif dataset == 'bengali':
        data_dir = './data/BHSig260/Bengali'
    elif dataset == 'hindi':
        data_dir = './data/BHSig260/Hindi'
    else:
        raise ValueError(f'Unknow dataset {dataset}')
    data = SignDataset(is_train, data_dir, image_transform)
    is_shuffle = is_train
    loader = DataLoader(data, batch_size=batch_size, shuffle=is_shuffle, num_workers=4, pin_memory=True)
    return loader


class SignDataset(Dataset):
    def __init__(self, is_train: bool, data_dir: str, image_transform=None):
        if not os.path.exists(os.path.join(data_dir, 'train.csv')) or not os.path.exists(
                os.path.join(data_dir, 'test.csv')):
            print('Not found train/test splits, run create_annotation first')
        else:
            print('Use existed train/test splits')

        if is_train:
            self.df = pd.read_csv(os.path.join(data_dir, 'train.csv'), header=None)
        else:
            self.df = pd.read_csv(os.path.join(data_dir, 'test.csv'), header=None)

        self.image_transform = image_transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        x1, x2, y = self.df.iloc[index]

        x1 = Image.open(x1).convert('L')
        x2 = Image.open(x2).convert('L')

        if self.image_transform:
            x1 = self.image_transform(x1)
            x2 = self.image_transform(x2)

        return x1, x2, y

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from torchvision import models


class ContrastiveLoss(nn.Module):
    def __init__(self, alpha, beta, margin):
        super().__init__()
        self.alpha = alpha
        self.beta = beta
        self.margin = margin

    def forward(self, x1, x2, y):
        '''
        Shapes:
        -------
        x1: [B,C]
        x2: [B,C]
        y: [B,1]

        Returns:
        --------
        loss: [B,1]]
        '''
        distance = torch.pairwise_distance(x1, x2, p=2)
        loss = self.alpha * (1 - y) * distance ** 2 + \
               self.beta * y * (torch.max(torch.zeros_like(distance), self.margin - distance) ** 2)
        return torch.mean(loss, dtype=torch.float)


class SigNet(nn.Module):
    '''
    Reference Keras: https://github.com/sounakdey/SigNet/blob/master/SigNet_v1.py
    '''

    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            # input size = [155, 220, 1]
            nn.Conv2d(1, 96, 11),  # size = [145,210]
            nn.ReLU(),
            nn.LocalResponseNorm(size=5, k=2, alpha=1e-4, beta=0.75),
            nn.MaxPool2d(2, stride=2),  # size = [72, 105]
            nn.Conv2d(96, 256, 5, padding=2, padding_mode='zeros'),  # size = [72, 105]
            nn.LocalResponseNorm(size=5, k=2, alpha=1e-4, beta=0.75),
            nn.MaxPool2d(2, stride=2),  # size = [36, 52]
            # nn.Dropout2d(p=0.3),
            nn.Conv2d(256, 384, 3, stride=1, padding=1, padding_mode='zeros'),
            nn.Conv2d(384, 256, 3, stride=1, padding=1, padding_mode='zeros'),
            nn.MaxPool2d(2, stride=2),  # size = [18, 26]
            # nn.Dropout2d(p=0.3),
            nn.Flatten(1, -1),  # 18*26*256
            # nn.LazyLinear(18 * 26 * 256, 1024),
            nn.LazyLinear(1024),
            # nn.Dropout2d(p=0.5),
            nn.Linear(1024, 128),
        )

        # TODO: init bias = 0

    def forward(self, x1, x2):
        x1 = self.features(x1)
        x2 = self.features(x2)
        return x1, x2


In [None]:
import os
from PIL import ImageOps
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms
from argparse import ArgumentParser

seed = 2020
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Device: {}'.format(device))


def train(model, optimizer, criterion, dataloader, log_interval=50):
    model.train()
    running_loss = 0
    number_samples = 0

    for batch_idx, (x1, x2, y) in enumerate(dataloader):
        x1, x2, y = x1.to(device), x2.to(device), y.to(device)

        optimizer.zero_grad()
        x1, x2 = model(x1, x2)
        loss = criterion(x1, x2, y)
        loss.backward()
        optimizer.step()

        number_samples += len(x1)
        running_loss += loss.item() * len(x1)
        if (batch_idx + 1) % log_interval == 0 or batch_idx == len(dataloader) - 1:
            print('{}/{}: Loss: {:.4f}'.format(batch_idx + 1, len(dataloader), running_loss / number_samples))
            running_loss = 0
            number_samples = 0


In [None]:
@torch.no_grad()
def eval(model, criterion, dataloader, log_interval=50):
    model.eval()
    running_loss = 0
    number_samples = 0

    distances = []

    for batch_idx, (x1, x2, y) in enumerate(dataloader):
        x1, x2, y = x1.to(device), x2.to(device), y.to(device)

        x1, x2 = model(x1, x2)
        loss = criterion(x1, x2, y)
        distances.extend(zip(torch.pairwise_distance(x1, x2, 2).cpu().tolist(), y.cpu().tolist()))

        number_samples += len(x1)
        running_loss += loss.item() * len(x1)

        if (batch_idx + 1) % log_interval == 0 or batch_idx == len(dataloader) - 1:
            print('{}/{}: Loss: {:.4f}'.format(batch_idx + 1, len(dataloader), running_loss / number_samples))

    distances, y = zip(*distances)
    distances, y = torch.tensor(distances), torch.tensor(y)
    max_accuracy = accuracy(distances, y)
    print(f'Max accuracy: {max_accuracy}')
    return running_loss / number_samples, max_accuracy

In [None]:
batch_size = 32
lr = 1e-5
dataset = 'cedar'

In [None]:
model = SigNet().to(device)
criterion = ContrastiveLoss(alpha=1, beta=1, margin=1).to(device)
optimizer = optim.RMSprop(model.parameters(), lr=1e-5, eps=1e-8, weight_decay=5e-4, momentum=0.9)
scheduler = optim.lr_scheduler.StepLR(optimizer, 5, 0.1)
num_epochs = 20

image_transform = transforms.Compose([
        transforms.Resize((100, 100)),
        ImageOps.invert,
        transforms.ToTensor(),
        # TODO: add normalize
    ])

In [None]:
trainloader = get_data_loader(is_train=True, batch_size=batch_size, image_transform=image_transform,
                                  dataset=dataset)
testloader = get_data_loader(is_train=False, batch_size=batch_size, image_transform=image_transform,
                                 dataset=dataset)
os.makedirs('checkpoints', exist_ok=True)

In [None]:
losses = []
accuracies = []

model.train()
print(model)
for epoch in range(num_epochs):
    print('Epoch {}/{}'.format(epoch, num_epochs))
    print('Training', '-' * 20)
    train(model, optimizer, criterion, trainloader)
    print('Evaluating', '-' * 20)
    loss, acc = eval(model, criterion, testloader)
    losses.append(loss)
    accuracies.append(acc)
    scheduler.step()

#     to_save = {
#         'model': model.state_dict(),
#         'scheduler': scheduler.state_dict(),
#         'optim': optimizer.state_dict(),
#     }

#     print('Saving checkpoint..')
#     torch.save(to_save, 'checkpoints/epoch_{}_loss_{:.3f}_acc_{:.3f}.pt'.format(epoch, loss, acc))

print('Done')

In [None]:
from matplotlib import pyplot as plt

plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.show()

# Plotting the accuracy
plt.plot(accuracies)
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training Accuracy')
plt.show()