# Face Comparing by Siamese Network

## Imports

In [None]:
# Python Imaging Library
from PIL import Image    
import PIL.ImageOps

# Packages
import torch
from torch.utils.data import DataLoader,Dataset       #https://zhuanlan.zhihu.com/p/30934236
from torch import optim     # 包含optimization algorithms
import torch.nn.functional as F     # 包含activation functions
from torch.autograd import Variable      # 以Variable形式嵌套激励函数 
import torch.nn as nn

# Torchvision 包含目前流行的数据集，模型结构和常用的图片转换工具等
import torchvision
import torchvision.datasets as dset    # 数据集
import torchvision.transforms as transforms  # 可对PIL.Image, Tensor进行变换
import torchvision.utils  ##

# Others
import numpy as np
import random
import matplotlib.pyplot as plt
import seaborn as sns

## 出图的功能函数

In [None]:
def imshow(img, text=None, should_save=False):     #数据集出图
    npimg = img.numpy() 
    plt.axis("off")
    if text:
        plt.text(75, 8, text, style='italic', fontweight='bold',
                bbox={'facecolor':'white','alpha':0.8, 'pad':10})
    plt.imshow(np.transpose(npimg, (1,2,0)))
    plt.show()
        
def show_plot(iteration, loss):                    #观察迭代损失
    plt.plot(iteration, loss)
    plt.xlabel('Iteration Number')
    plt.ylabel('Loss')
    plt.show

## 路径与参数的配置

In [None]:
class Config():
    training_dir = "./Datasets/att/training/"
    validation_dir = "./Datasets/att/validamulti"
    testing_dir = "./Datasets/att/testing/"
    train_batch_size = 32           #批样本数
    train_number_epoch = 100     #整批训练次数，即遍历了多少次所有的训练样本

## 定义训练集数据

0 for Anchor-Positive pairs, 1 for Anchor-Negative pairs.  训练集数据须注意数据的均衡

In [None]:
class SiameseNetworkDataset(Dataset):
    
    def __init__(self,imageFolderDataset,transform=None,should_invert=True):
        self.imageFolderDataset = imageFolderDataset
        self.transform = transform
        self.should_invert = should_invert
        
    def __getitem__(self,index):
        img0_tuple = random.choice(self.imageFolderDataset.imgs)   #在self...imgs中返回随机项
        # 数据均衡，等概率取A-P对和A-N对
        should_get_same_class = random.randint(0,1)    # 随机返回0，1
        if should_get_same_class:
            while True:
                # 需要A-P对，保持循环，直到找到相同的类的图像
                img1_tuple = random.choice(self.imageFolderDataset.imgs)   #元组=（图片路径，类编号）
                if img0_tuple[1] == img1_tuple[1]:                         #图片编号相同则是同一个类（即文件夹）
                    break
        else:
            while True:
                # 需要A-N对，同样循环，保证两个图像的类不同
                img1_tuple = random.choice(self.imageFolderDataset.imgs)
                if img0_tuple[1] != img1_tuple[1]:                         #图片编号相同则是同一个类（即文件夹）
                    break                
            
        img0 = Image.open(img0_tuple[0])    #读取实际图像，tuple[0]即路径
        img1 = Image.open(img1_tuple[0])
        img0 = img0.convert("L")            #from PIL，把img转换为256级灰度图像， L：8-bit pixels,black and white
        img1 = img1.convert("L")
        
        if self.should_invert:
            img0 = PIL.ImageOps.invert(img0)    #可将输入图像转换为反色图像
            img1 = PIL.ImageOps.invert(img1)
            
        if self.transform is not None:
            img0 = self.transform(img0)
            img1 = self.transform(img1)
            
        return img0, img1, torch.from_numpy(np.array([int(img1_tuple[1]!=img0_tuple[1])], dtype=np.float32))
            #返回 img0,img1,以及二者的类别（0或1），完成数据组对
    
    
    def __len__(self):
        return len(self.imageFolderDataset.imgs)          #数据集大小

## 数据集调用

In [None]:
folder_dataset = dset.ImageFolder(root=Config.training_dir)

siamese_dataset = SiameseNetworkDataset(imageFolderDataset=folder_dataset,
                                       transform=transforms.Compose([transforms.Resize((100,100)),
                                                                     transforms.ToTensor()
                                                                     ]) #compose将“transforms”组合
                                        ,should_invert=False)

# ImageFolder: pytorch.org/docs/master/torchvision/datasets.html#imagefolder
# transforms: http://pytorch.org/docs/master/torchvision/transforms.html

## 部分数据可视化

各列首末为一对，0列相似，1列不相似

In [None]:
vis_dataloader = DataLoader(dataset=siamese_dataset,
                           shuffle=True,    #每个epoch打乱顺序
                           num_workers=2,   #加载数据的子进程数
                           batch_size=8)    #每批加载的样本数
data_iter = iter(vis_dataloader)    #生成迭代器

example_batch = next(data_iter)     #返回迭代器的下一个项目
concatenated = torch.cat((example_batch[0],example_batch[1]),0)  #张量拼接，0：按行，1：按列
imshow(torchvision.utils.make_grid(concatenated))
print(example_batch[2].numpy())

## DataLoader: http://pytorch.org/docs/master/data.html

## 构建神经网络

采用标准的cnn结构。在每个卷积层后进行批标准化。网络接受一个100px* 100px的输入。在卷积层后有三个全连接层。

In [None]:
class SiameseNetwork(nn.Module):                 # torch.nn.Module:所有神经网络模块的基类
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.cnn1 = nn.Sequential(               # torch.nn.Sequential:模块将按照在构造器中的顺序传递
            nn.ReflectionPad2d(1),               # 使用输入边界的反射来填充输入张量
            nn.Conv2d(                           # 对信号（由若干输入平面组成）进行2维卷积
                in_channels=1,                   # input height. 灰度图1层，RGB为3层，即图像的通道数
                out_channels=4,                  # n_filters 过滤器数目，过滤器提取的特征数
                kernel_size=3,                   # 卷积核 filter size 3x3
                stride=1,                        # filter movement/step 跳度
                padding=0,                       # 若想要con2d出来的图片长宽没有变化，当stride=1,padding=(kernel_size-1)/2
            ),
            nn.ReLU(inplace=True),               # ReLU(x)=max(0,x)
            nn.BatchNorm2d(4),                   # 对4维输入应用批标准化（一个小批量的2维输入和额外的通道尺寸）
            
            nn.ReflectionPad2d(1),
            nn.Conv2d(
                in_channels=4,
                out_channels=8,
                kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(8),
        )
        
        self.fc1 = nn.Sequential(                   # fc
            nn.Linear(in_features=8*100*100,        # 线性层,y=Ax+b
                      out_features=500),
            nn.ReLU(inplace=True),
            
            nn.Linear(500,500),
            nn.ReLU(inplace=True),
            
            nn.Linear(500,5)
    )
    
    def forward_once(self,x):
        output = self.cnn1(x)
        output = output.view(output.size()[0],-1)   #展平多维的卷积图。http://pytorch.org/docs/master/tensors.html?highlight=view#torch.Tensor.view
        output = self.fc1(output)
        return output
    
    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2
    

In [None]:
net = SiameseNetwork()
print(net)

In [None]:
net.load_state_dict(torch.load('sia_att_params.pkl'))

## Contrastive Loss

In [None]:
class ContrastiveLoss(torch.nn.Module):
    """
    Contrastive loss function: http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    """ 
    
    def __init__(self,margin=0.2):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin
        
    def forward(self, output1, output2, label):
        eu_distance = F.pairwise_distance(output1, output2)         #euclidean distance
        loss_contrastive = torch.mean( (1-label) * torch.pow(eu_distance, 2) + 
                                      (label) * torch.pow(torch.clamp(self.margin - eu_distance, min=0.0),2) )
        
        return loss_contrastive

## Training

In [None]:
train_dataloader = DataLoader(siamese_dataset,
                             shuffle=True,
                             num_workers=2,
                             batch_size=Config.train_batch_size)


criterion = ContrastiveLoss()
optimizer = optim.Adam(net.parameters(),lr =0.00001)         #Adam优化所有参数，学习率=0.0005


In [None]:
counter = []
loss_history = []
iteration_number = 0      #初始化迭代次数（更新了多少次参数的值）

In [None]:
%%time
for epoch in range(0, Config.train_number_epoch):
    for i, data in enumerate(train_dataloader, 0):   #将一个可遍历的数据对象组合为一个索引序列，同时列出数据和数据下标
        img0, img1, label = data
        img0, img1, label = Variable(img0), Variable(img1), Variable(label)
        output1, output2 = net(img0, img1)   # network output
        optimizer.zero_grad()      # clear gradients for this training step
        loss_contrastive = criterion(output1, output2, label)  #计算loss
        loss_contrastive.backward()    #backprop, 计算梯度
        optimizer.step()                #apply gradients        

        if i%10 == 0:
            print("Epoch: {} | Current Loss {}\n".format(epoch, loss_contrastive.data[0]))
            iteration_number += 10
            counter.append(iteration_number)   
            loss_history.append(loss_contrastive.data[0])
    
    torch.save(net.state_dict(),'sia_lfwa_params_vt.pkl')
    
show_plot(counter, loss_history)

In [None]:
torch.save(net.state_dict(),'sia_att_params_vt.pkl')

## distribution

观察训练集数据分布

In [None]:
folder_train_distri = dset.ImageFolder(root=Config.training_dir)
siamese_train_distri = SiameseNetworkDataset(imageFolderDataset=folder_train_distri,
                                       transform=transforms.Compose([transforms.Resize((100,100)),
                                                                    transforms.ToTensor()
                                                                    ]),
                                       should_invert=False)

train_distri_dataloader = DataLoader(siamese_train_distri,
                             num_workers=8,
                             batch_size=1,
                             shuffle=True)



same_one_train = np.array([])
diff_one_train = np.array([])
label_1 = np.array([1.0])
label_0 = np.array([0.0])

for i, data in enumerate(train_distri_dataloader, 0):   #将一个可遍历的数据对象组合为一个索引序列，同时列出数据和数据下标
    img0, img1, label = data
    img0, img1, label = Variable(img0), Variable(img1), Variable(label)
    output1, output2 = net(img0, img1)   # network output
    eu_distance = F.pairwise_distance(output1, output2)
    eu_dist = eu_distance.cpu().data.numpy()[0][0]
    label_num = label.cpu().data.numpy()[0][0]    
    if (label_num == label_0).all():    #a "numpy way"
        same_one_train = np.append(same_one_train, eu_dist)
    else:
        diff_one_train = np.append(diff_one_train, eu_dist)    

            
sns.distplot(same_one_train, 
             #bins=20, 
             rug=False, 
             hist=True,
            label='Samples from same classes')
sns.distplot(diff_one_train, 
             #bins=20, 
             rug=False, 
             hist=True,
            label='Samples from different classes')
plt.xlabel('Dissimilarity(Euclidean Distance)')
plt.ylabel('Relative quantity')
plt.legend()
plt.show()

观察验证集数据分布

In [None]:
folder_valida_distri = dset.ImageFolder(root=Config.validation_dir)
siamese_valida_distri = SiameseNetworkDataset(imageFolderDataset=folder_valida_distri,
                                       transform=transforms.Compose([transforms.Resize((100,100)),
                                                                    transforms.ToTensor()
                                                                    ]),
                                       should_invert=False)

valida_distri_dataloader = DataLoader(siamese_valida_distri,
                             num_workers=8,
                             batch_size=1,
                             shuffle=True)



same_one_valida = np.array([])
diff_one_valida = np.array([])
label_1 = np.array([1.0])
label_0 = np.array([0.0])

for i, data in enumerate(valida_distri_dataloader, 0):   #将一个可遍历的数据对象组合为一个索引序列，同时列出数据和数据下标
    img0, img1, label = data
    img0, img1, label = Variable(img0), Variable(img1), Variable(label)
    output1, output2 = net(img0, img1)   # network output
    eu_distance = F.pairwise_distance(output1, output2)
    eu_dist = eu_distance.cpu().data.numpy()[0][0]
    label_num = label.cpu().data.numpy()[0][0]    
    if (label_num == label_0).all():    #a "numpy way"
        same_one_valida = np.append(same_one_valida, eu_dist)
    else:
        diff_one_valida = np.append(diff_one_valida, eu_dist)    

            

sns.distplot(same_one_valida, 
             #bins=20, 
             rug=False, 
             hist=True,
            label='Samples from same classes')
sns.distplot(diff_one_valida, 
             #bins=20, 
             rug=False, 
             hist=True,
            label='Samples from different classes')
plt.xlabel('Dissimilarity(Euclidean Distance)')
plt.ylabel('Relative Quantity')
plt.legend()
plt.show()

## ROC

In [None]:
tpr_array = np.array([])
fpr_array = np.array([])
same_one_valida = np.array([])
diff_one_valida = np.array([])
label_1 = np.array([1.0])
label_0 = np.array([0.0])


for thres in range(0,5,1):
    print(thres)
    tpm = 0
    fpm = 0
    tp_fn = 0
    fp_tn = 0
    for i, data in enumerate(valida_distri_dataloader, 0):
        img0, img1, label = data
        img0, img1, label = Variable(img0), Variable(img1), Variable(label)
        output1, output2 = net(img0, img1)   # network output
        eu_distance = F.pairwise_distance(output1, output2)
        eu_dist = eu_distance.cpu().data.numpy()[0][0]
        label_num = label.cpu().data.numpy()[0][0]
        if (label_num == label_0).all():
            tp_fn = tp_fn + 1
        else:
            fp_tn = fp_tn + 1
        if eu_dist < (thres/10):
            if (label_num == label_0).all():    #a "numpy way"
                tpm = tpm + 1
#                 print('tpm={},tp_fn={},thres={}'.format(tpm,tp_fn,(thres/10)))
            else:
                fpm = fpm + 1
#                 print('fpm={},fp_tn={},thres={}'.format(fpm,fp_tn,(thres/10)))
        
    try:
        tpr=tpm/tp_fn
    except:
        print("still 0")
    else:
        tpr_array = np.append(tpr_array,tpr)
    try:
        fpr=fpm/fp_tn
    except:
        print("still 0")
    else:
        fpr_array = np.append(fpr_array,fpr)
    

In [None]:
print(tp_fn,fp_tn,tpm,fpm)
len(fpr_array)
x = fpr_array
y = tpr_array 
sia_adam_roc = (x,y)
sia_adam_roc

In [None]:
np.save("sia_adam_roc.npy",sia_adam_roc)

In [None]:
sia_adam_roc = np.load("sia_adam_roc.npy")

In [None]:
x = sia_adam_roc[0]
y = sia_adam_roc[1]

# This is the ROC curve
plt.plot(x,y,'+')
plt.xlim(0,1)
plt.ylim(0,1)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

# This is the AUC
auc = np.trapz(y,x)
print(auc)

## testing example

4组数据作为数据集测试

In [None]:
%%time
threshold = 0.135
folder_dataset_test = dset.ImageFolder(root=Config.testing_dir)
siamese_dataset_test = SiameseNetworkDataset(imageFolderDataset=folder_dataset_test,
                                       transform=transforms.Compose([transforms.Resize((100,100)),
                                                                    transforms.ToTensor()
                                                                    ]),
                                       should_invert=False)

test_dataloader = DataLoader(siamese_dataset_test,
                             num_workers=1,
                             batch_size=1,
                             shuffle=True)
dataiter = iter(test_dataloader)
x0,_,_ = next(dataiter)

for i in range(10):
    x0, x1, label2 = next(dataiter)
    concatenated = torch.cat((x0,x1),0)
    output1, output2 = net(Variable(x0), Variable(x1))     #Variable(x0).cuda()
    eu_distance = F.pairwise_distance(output1, output2)
    eu_dist = eu_distance.cpu().data.numpy()[0][0]
    if eu_dist <= threshold:
        imshow(torchvision.utils.make_grid(concatenated),
           'Dissimilarity:{:.2f}\nSame One!'.format(eu_dist))
    else:
        imshow(torchvision.utils.make_grid(concatenated),
           'Dissimilarity:{:.2f}\nDifferent Ones!'.format(eu_dist)) 

## 输入特定两幅人脸图片进行比对

In [None]:
def pil_loader(path):
    with open(path, 'rb') as f:
        with Image.open(f) as img:
            return img.convert('L')

def accimage_loader(path):
    import accimage
    try:
        return accimage.Image(path)
    except IOError:
        # Potentially a decoding problem, fall back to PIL.Image
        return pil_loader(path)

def default_loader(path):
    from torchvision import get_image_backend
    if get_image_backend() == 'accimage':
        return accimage_loader(path)
    else:
        return pil_loader(path)

In [None]:
class DataPair(Dataset):
    def __init__(self, path_a, path_b, data_transforms=None, loader = default_loader):
        self.path_a = path_a
        self.path_b = path_b  
        self.data_transforms = data_transforms
        self.loader = loader

    def __len__(self):
        return len(self.path_a)

    def __getitem__(self, item):
        img_a = self.loader(path_a)
        img_b = self.loader(path_b)
        
        if self.data_transforms is not None:
            try:
                img_a = self.data_transforms(img_a)
            except:
                print("Cannot transform image: {}".format(img_a))
            try:
                img_b = self.data_transforms(img_b)
            except:
                print("Cannot transform image: {}".format(img_s))

                
        return img_a, img_b

In [None]:
def SiameseSoftware(path_a, path_b):
    threshold = 0.135
    try_datasets = DataPair(path_a = path_a, 
                             path_b = path_b,
                             data_transforms=transforms.Compose([transforms.Resize((100,100)),
                                                                         transforms.ToTensor()
                                                                         ]),
                                 )
    try_dataloaders = DataLoader(try_datasets,
                        batch_size=1,
                        shuffle=False)
    dataiter = iter(try_dataloaders)
    x0,_ = next(dataiter)

    x0, x1 = next(dataiter)
    concatenated = torch.cat((x0,x1),0)
    output1, output2 = net(Variable(x0), Variable(x1))     #Variable(x0).cuda()
    eu_distance = F.pairwise_distance(output1, output2)
    eu_dist = eu_distance.cpu().data.numpy()[0][0]
    if eu_dist <= threshold:
        imshow(torchvision.utils.make_grid(concatenated),
           'Dissimilarity:{:.2f}\nSame One!'.format(eu_dist))
    else:
        imshow(torchvision.utils.make_grid(concatenated),
           'Dissimilarity:{:.2f}\nDifferent Ones!'.format(eu_dist))  

In [None]:
%%time
path_a = './Datasets/att/validation/s32/2.png'      #输入图片路径
path_b = './Datasets/att/validation/s32/7.png'
SiameseSoftware(path_a, path_b)