In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from pynvml import *
import os
import time
from torch.utils import data
import cv2
import random
from tqdm import tqdm

os.environ['CUDA_VISIBLE_DEVICES']='0'

from tensorboardX import SummaryWriter

# Dataset

## preprocess

### CEDAR

In [15]:
with open('CEDAR/gray_train.txt', 'w') as f:
	for i in range(1, 51):
		for j in range(1, 25):
			for k in range(j+1, 25):
				f.write('signatures/full_org/original_{0}_{1}.png signatures/full_org/original_{0}_{2}.png 1\n'.format(i, j, k))
		org_forg = [(j,k) for j in range(1, 25) for k in range(1, 25)]
		for (j, k) in random.choices(org_forg, k=276):
			f.write('signatures/full_org/original_{0}_{1}.png signatures/full_forg/forgeries_{0}_{2}.png 0\n'.format(i, j, k))

with open('CEDAR/gray_test.txt', 'w') as f:
	for i in range(51, 56):
		for j in range(1, 25):
			for k in range(j+1, 25):
				f.write('signatures/full_org/original_{0}_{1}.png signatures/full_org/original_{0}_{2}.png 1\n'.format(i, j, k))
		org_forg = [(j,k) for j in range(1, 25) for k in range(1, 25)]
		for (j, k) in random.choices(org_forg, k=276):
			f.write('signatures/full_org/original_{0}_{1}.png signatures/full_forg/forgeries_{0}_{2}.png 0\n'.format(i, j, k))

# with open('CEDAR/gray_train.txt', 'w') as f:
# 	for i in range(1, 51):
# 		org_org = [(j,k) for j in range(1, 25) for k in range(j+1, 25)]
# 		for (j, k) in random.choices(org_org, k=25):
# 			f.write('signatures/full_org/original_{0}_{1}.png signatures/full_org/original_{0}_{2}.png 1\n'.format(i, j, k))
# 		org_forg = [(j,k) for j in range(1, 25) for k in range(1, 25)]
# 		for (j, k) in random.choices(org_forg, k=25):
# 			f.write('signatures/full_org/original_{0}_{1}.png signatures/full_forg/forgeries_{0}_{2}.png 0\n'.format(i, j, k))

# with open('CEDAR/gray_test.txt', 'w') as f:
# 	for i in range(51, 56):
# 		for j in range(1, 25):
# 			for k in range(j+1, 25):
# 				f.write('signatures/full_org/original_{0}_{1}.png signatures/full_org/original_{0}_{2}.png 1\n'.format(i, j, k))
# 		org_forg = [(j,k) for j in range(1, 25) for k in range(1, 25)]
# 		for (j, k) in random.choices(org_forg, k=276):
# 			f.write('signatures/full_org/original_{0}_{1}.png signatures/full_forg/forgeries_{0}_{2}.png 0\n'.format(i, j, k))


### SVC2004

In [3]:
with open('SVC2004/gray_train.txt', 'w') as f:
	for i in range(1, 33):
		for j in range(1, 21):
			for k in range(j+1, 21):
				f.write('user{0}/genuine/U{0}S{1}.jpg user{0}/genuine/U{0}S{2}.jpg 1\n'.format(i, j, k))
		org_forg = [(j,k) for j in range(1, 21) for k in range(21, 41)]
		for (j, k) in random.choices(org_forg, k=190):
			f.write('user{0}/genuine/U{0}S{1}.jpg user{0}/forged/U{0}S{2}.jpg 0\n'.format(i, j, k))

with open('SVC2004/gray_test.txt', 'w') as f:
	for i in range(33, 41):
		for j in range(1, 21):
			for k in range(j+1, 21):
				f.write('user{0}/genuine/U{0}S{1}.jpg user{0}/genuine/U{0}S{2}.jpg 1\n'.format(i, j, k))
		org_forg = [(j,k) for j in range(1, 21) for k in range(21, 41)]
		for (j, k) in random.choices(org_forg, k=190):
			f.write('user{0}/genuine/U{0}S{1}.jpg user{0}/forged/U{0}S{2}.jpg 0\n'.format(i, j, k))

# with open('SVC2004/gray_train.txt', 'w') as f:
# 	for i in range(1, 33):
# 		org_org = [(j, k) for j in range(1, 21) for k in range(j+1, 21)]
# 		for (j, k) in random.choices(org_org, k=20):
# 			f.write('user{0}/genuine/U{0}S{1}.jpg user{0}/genuine/U{0}S{2}.jpg 1\n'.format(i, j, k))
# 		org_forg = [(j,k) for j in range(1, 21) for k in range(21, 41)]
# 		for (j, k) in random.choices(org_forg, k=20):
# 			f.write('user{0}/genuine/U{0}S{1}.jpg user{0}/forged/U{0}S{2}.jpg 0\n'.format(i, j, k))

# with open('SVC2004/gray_test.txt', 'w') as f:
# 	for i in range(33, 41):
# 		for j in range(1, 21):
# 			for k in range(j+1, 21):
# 				f.write('user{0}/genuine/U{0}S{1}.jpg user{0}/genuine/U{0}S{2}.jpg 1\n'.format(i, j, k))
# 		org_forg = [(j,k) for j in range(1, 21) for k in range(21, 41)]
# 		for (j, k) in random.choices(org_forg, k=190):
# 			f.write('user{0}/genuine/U{0}S{1}.jpg user{0}/forged/U{0}S{2}.jpg 0\n'.format(i, j, k))

In [16]:
import glob
import random
import itertools
from sklearn.model_selection import train_test_split
from typing import List, Tuple
import csv
import os

def write_csv(file_path, samples):
    with open(file_path, 'wt') as f:
        writer = csv.writer(f)
        writer.writerows(samples)

def make_partition(
    signers: List[int],
    pair_genuine_genuine: List[Tuple[int, int]],
    pair_genuine_forged: List[Tuple[int, int]],
):
    samples = []
    for signer_id in signers:
        sub_pair_genuine_forged = random.sample(pair_genuine_forged, len(pair_genuine_genuine))
        genuine_genuine = list(itertools.zip_longest(pair_genuine_genuine, [], fillvalue=1)) # y = 1
        genuine_genuine = list(map(lambda sample: (signer_id, *sample[0], sample[1]), genuine_genuine))
        samples.extend(genuine_genuine)
        genuine_forged = list(itertools.zip_longest(sub_pair_genuine_forged, [], fillvalue=0)) # y = 0
        genuine_forged = list(map(lambda sample: (signer_id, *sample[0], sample[1]), genuine_forged))
        samples.extend(genuine_forged)
    return samples


def prepare_CEDAR(M: int, K: int, random_state=0, data_dir='CEDAR/signatures'):
    def get_path(row):
        writer_id, x1, x2, y = row
        if y == 1:
            x1 = os.path.join(data_dir, 'full_org', f'original_{writer_id}_{x1}.png')
            x2 = os.path.join(data_dir, 'full_org', f'original_{writer_id}_{x2}.png')
        else:
            x1 = os.path.join(data_dir, 'full_org', f'original_{writer_id}_{x1}.png')
            x2 = os.path.join(data_dir, 'full_forg', f'forgeries_{writer_id}_{x2}.png')
        return x1, x2, y # drop writer_id

    random.seed(random_state)
    signers = list(range(1, K+1))
    num_genuine_sign = 24
    num_forged_sign = 24

    train_signers, test_signers = train_test_split(signers, test_size=K-M)
    pair_genuine_genuine = list(itertools.combinations(range(1, num_genuine_sign+1), 2))
    pair_genuine_forged = list(itertools.product(range(1, num_genuine_sign+1), range(1, num_forged_sign+1)))

    train_samples = make_partition(train_signers, pair_genuine_genuine, pair_genuine_forged)
    train_samples = list(map(get_path, train_samples))
    write_csv(os.path.join(data_dir, 'train.csv'), train_samples)
    test_samples = make_partition(test_signers, pair_genuine_genuine, pair_genuine_forged)
    test_samples = list(map(get_path, test_samples))
    write_csv(os.path.join(data_dir, 'test.csv'), test_samples)



prepare_CEDAR(M=50, K=55)

### Construct dataset

In [6]:
class dataset(data.Dataset):
    def __init__(self, root='SVC2004/', train=True):
        super(dataset, self).__init__()
        if train:
            path = root + 'gray_train.txt'
        else:
            path = root + 'gray_test.txt'
        
        with open(path, 'r') as f:
            lines = f.readlines()
        
        self.labels = []
        self.datas = []
        for line in tqdm(lines):
            refer, test, label = line.split()
            # print(root + refer)

            refer_img = cv2.imread(root + refer, 0)
            test_img = cv2.imread(root + test, 0)

            refer_img = cv2.resize(refer_img, (220, 155))
            test_img = cv2.resize(refer_img, (220, 155))
            

            # plt.imshow(refer_img, cmap='gray')
            # break
            
        
            refer_img = refer_img.reshape(-1, refer_img.shape[0], refer_img.shape[1])
            test_img = test_img.reshape(-1, test_img.shape[0], test_img.shape[1])

            # print(refer_img.shape)
            # print(test_img.shape)

            refer_test = np.concatenate((refer_img, test_img), axis=0)
            self.datas.append(refer_test)
            self.labels.append(int(label))
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        return torch.FloatTensor(self.datas[index]), float(self.labels[index])

#my_dataset = dataset(root = 'CEDAR/')

## Loss Function

In [None]:
TRAINING_NUM = 40
TOTAL_PICTURE = 55



def getSignature_location(uid, sid, gen=1):
    if 1 <= uid <= 55 and 1 <= sid <= 24:
        if gen == 1:
            src = './CEDAR/signatures/full_org' + '/original_{}_{}.png'
            src = src.format(uid, sid)
            return src
        else:
            src = './CEDAR/signatures/full_forg' + '/forgeries_{}_{}.png'
            src = src.format(uid, sid)
            return src
    else:
        return None


def getGenuineSignature(uid, sid = -1) :
    if sid == -1: sid = random.randint(1, 24)
    return getSignature_location(uid, sid, gen=1)

def getForgedSignature(uid, sid = -1) :
    if sid == -1: sid = random.randint(1, 24)
    # print(sid)
    return getSignature_location(uid, sid, gen=0)


class dataset(data.Dataset):
    def __init__(self, root='dataset/CEDAR/', train=True):
        super(dataset, self).__init__()
        self.labels = []
        self.datas = []
        
        if train:

            for i in range(1, 1+TRAINING_NUM):
                for j in range(24):
                    while(1):
                        src1 = getGenuineSignature(i)
                        src2 = getGenuineSignature(i)
                        if src1 != src2:
                            break
                    # print(src1)
                    refer_img = cv2.resize(cv2.imread(src1, 0), (250, 100))
                    test_img = cv2.resize(cv2.imread(src2, 0), (250, 100))
                    

                    refer_img = refer_img.reshape(-1, refer_img.shape[0], refer_img.shape[1])
                    test_img = test_img.reshape(-1, test_img.shape[0], test_img.shape[1])

                    
                    refer_test = np.concatenate((refer_img, test_img), axis=0)
                    self.datas.append(refer_test)

                    self.labels.append(1)

                    while(1):
                        src1 = getGenuineSignature(i)
                        src2 = getForgedSignature(i)
                        if src1 != src2:
                            break
                    refer_img = cv2.resize(cv2.imread(src1, 0), (250, 100))
                    test_img = cv2.resize(cv2.imread(src2, 0), (250, 100))

                    refer_img = refer_img.reshape(-1, refer_img.shape[0], refer_img.shape[1])
                    test_img = test_img.reshape(-1, test_img.shape[0], test_img.shape[1])

                    refer_test = np.concatenate((refer_img, test_img), axis=0)
                    self.datas.append(refer_test)

                    self.labels.append(0)
            print(len(self.labels))
            print(self.datas[0].shape)
            print("Training model loaded successfully.")
        else:
            for i in range(1+TRAINING_NUM,TOTAL_PICTURE + 1):
                for j in range(1, 25):
                    for k in range(1, 25):

                        src1 = getSignature_location(i, j)
                        src2 = getSignature_location(i, k)
                        src3 = getSignature_location(i, k, 0)

                        refer_img = cv2.resize(cv2.imread(src1, 0), (250, 100))
                        test_img = cv2.resize(cv2.imread(src2, 0), (250, 100))

                        refer_img = refer_img.reshape(-1, refer_img.shape[0], refer_img.shape[1])
                        test_img = test_img.reshape(-1, test_img.shape[0], test_img.shape[1])

                        refer_test = np.concatenate((refer_img, test_img), axis=0)
                        self.datas.append(refer_test)

                        self.labels.append(1)

                        refer_img = cv2.resize(cv2.imread(src1, 0), (250, 100))
                        test_img = cv2.resize(cv2.imread(src3, 0), (250, 100))

                        refer_img = refer_img.reshape(-1, refer_img.shape[0], refer_img.shape[1])
                        test_img = test_img.reshape(-1, test_img.shape[0], test_img.shape[1])

                        refer_test = np.concatenate((refer_img, test_img), axis=0)
                        self.datas.append(refer_test)

                        self.labels.append(0)

            print(len(self.labels))
            print(self.datas[0].shape)
            print("Testing model loaded successfully.")

    def __len__(self):
        return len(self.labels)


    def __getitem__(self, index):
        return torch.FloatTensor(self.datas[index]), float(self.labels[index])

 


# Train

Set up hyperparameters:

In [7]:
BATCH_SIZE = 32
EPOCHS = 10
LEARNING_RATE = 0.001

np.random.seed(0)
torch.manual_seed(1)

<torch._C.Generator at 0x7fbbe0113970>

Prepare dataset:

In [10]:
train_set = dataset(root='CEDAR/', train=True)
test_set = dataset(root='CEDAR/', train=False)
train_loader = torch.utils.data.DataLoader(train_set, batch_size = BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size = 2*BATCH_SIZE, shuffle=False)

100%|██████████| 27600/27600 [14:14<00:00, 32.29it/s]  
100%|██████████| 2760/2760 [03:28<00:00, 13.24it/s] 


In [11]:
from models.Signet import SigNet
from models.Signet import ContrastiveLoss
        
cuda = torch.cuda.is_available()
print(cuda)

model = SigNet()
if cuda:
    model = model.cuda()
criterion = ContrastiveLoss(1,1,1)
# optimizer = optim.Adam(model.parameters(), lr = LEARNING_RATE)
# optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE)
optimizer = optim.RMSprop(model.parameters(), lr=1e-5, eps=1e-8, weight_decay=5e-4, momentum=0.9)
scheduler = optim.lr_scheduler.StepLR(optimizer, 5, 0.1)
writer = SummaryWriter(log_dir='scalar')

if cuda:
    criterion = criterion.cuda()
iter_n = 0
t = time.strftime("%m-%d-%H-%M", time.localtime())

True


In [12]:
def compute_accuracy(predicted, labels):
    for i in range(3):
        predicted[i][predicted[i] > 0.5] = 1
        predicted[i][predicted[i] <= 0.5] = 0
    predicted = predicted[0] + predicted[1] + predicted[2]
    
    predicted[predicted < 2] = 0
    predicted[predicted >= 2] = 1
    predicted = predicted.view(-1)
    accuracy = torch.sum(predicted == labels).item() / labels.size()[0]
    return accuracy


def accuracy(distances, y, step=0.01):
    min_threshold_d = min(distances)
    max_threshold_d = max(distances)
    max_acc = 0
    same_id = (y == 1)

    for threshold_d in torch.arange(min_threshold_d, max_threshold_d+step, step):
        true_positive = (distances <= threshold_d) & (same_id)
        true_positive_rate = true_positive.sum().float() / same_id.sum().float()
        true_negative = (distances > threshold_d) & (~same_id)
        true_negative_rate = true_negative.sum().float() / (~same_id).sum().float()

        acc = 0.5 * (true_negative_rate + true_positive_rate)
        max_acc = max(max_acc, acc)
    return max_acc

In [13]:
for epoch in range(1, EPOCHS + 1):
    for i, (inputs, labels) in enumerate(train_loader):
        torch.cuda.empty_cache()

        model.train()
        # print(inputs.shape)
        # break
        labels = labels.float()
        if cuda:
            inputs, labels = inputs.cuda(), labels.cuda()
        
        # print(inputs.shape)
        x1,x2 = model(inputs[:,:1,:,:],inputs[:,1:,:,:])
        #print(predicted)

        loss = criterion(x1 , x2, labels)  
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # accuracy = compute_accuracy(predicted, labels)

        writer.add_scalar(t+'/train_loss', loss.item(), iter_n+epoch*len(train_loader))
        # writer.add_scalar(t+'/train_accuracy', accuracy, iter_n)
        
        if (i+1) % 50 == 0:
            model.eval()
            distances=[]
            with torch.no_grad():
                accuracys = []
                for i_, (inputs_, labels_) in enumerate(test_loader):
                    labels_ = labels_.float()
                    if cuda:
                        inputs_, labels_ = inputs_.cuda(), labels_.cuda()
                    
                    x1_,x2_ = model(inputs_[:,:1,:,:],inputs_[:,1:,:,:])
                    # print(predicted_)
                    distances.extend(zip(torch.pairwise_distance(x1_, x2_, 2).cpu().tolist(), labels_.cpu().tolist()))
                    # accuracys.append(compute_accuracy(predicted_, labels_))
                # accuracy_ = sum(accuracys) / len(accuracys)
                # writer.add_scalar(t+'/test_accuracy', accuracy_, iter_n)
                distances, y = zip(*distances)
                distances, y = torch.tensor(distances), torch.tensor(y)
                max_accuracy = accuracy(distances, y)
                print(f'Max accuracy: {max_accuracy} loss:{loss.item()}')
            # print('test acc:{:.6f}'.format(accuracy_))

        iter_n += 1

        if i == 500:
            torch.save(model.state_dict(), 'model.pth')

        # if i % 10 == 0:
        #     print('Epoch[{}/{}], iter {}, loss:{:.6f}, accuracy:{}'.format(epoch, EPOCHS, i, loss.item(), accuracy))

writer.close()

  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)


Max accuracy: 0.5 loss:0.20820525288581848
Max accuracy: 0.5 loss:0.2655828893184662
Max accuracy: 0.5 loss:0.3004656136035919
Max accuracy: 0.5 loss:0.23220640420913696
Max accuracy: 0.5 loss:0.24856343865394592
Max accuracy: 0.5 loss:0.2712399661540985
Max accuracy: 0.5 loss:0.26978689432144165
Max accuracy: 0.5 loss:0.27564480900764465
Max accuracy: 0.5 loss:0.22872322797775269
Max accuracy: 0.5 loss:0.3019229769706726
Max accuracy: 0.5 loss:0.25382494926452637
Max accuracy: 0.5 loss:0.23250533640384674
Max accuracy: 0.5 loss:0.2619192898273468
Max accuracy: 0.5 loss:0.25241899490356445


KeyboardInterrupt: 