In [0]:
import torch
import torch.nn.functional as F 
import torch.nn as nn
from torch.autograd import Variable
import torchvision.models as models
from torchvision import transforms, utils
from torch.utils.data import Dataset, DataLoader
import numpy as np
import torch.optim as optim
from scipy.io import loadmat
from scipy.io import savemat
from pytorch_gdn import GDN

In [0]:
# network
device = torch.device('cuda')
n_channel = 1                      # (batch_size, channel, height, weight)
class FaceIrisQANet(nn.Module):
  def __init__(self):
      super(FaceIrisQANet,self).__init__()
      self.layer1 = nn.Conv2d(1, 1, kernel_size=3, stride=(1,2), padding=1, bias=True)
      self.layer2 = GDN(n_channel, device)
      self.layer3 = nn.BatchNorm2d(1)       # number of channel
      self.layer4 = nn.Linear(256, 16, bias=True)
      self.layer5 = GDN(n_channel, device)
      self.layer6 = nn.BatchNorm2d(1)       # number of channel
      self.layer7 = nn.Linear(1040, 1, bias=True)

  def forward(self, x_iris, x_face):           # x_iris = torch.Size([batch_size, 1, 512, 64]) # x_face = torch.Size([batch_size, 1, 1, 512])
      x_iris = x_iris.permute(0,1,3,2)         # torch.Size([batch_size, 1, 64, 512])
      y = torch.cat((x_iris,x_face),2)         # torch.Size([batch_size, 1, 65, 512])
      y = self.layer3(self.layer2(self.layer1(y)))   # torch.Size([batch_size, 1, 65, 256])          
      y = self.layer6(self.layer5(self.layer4(y)))   # torch.Size([batch_size, 1, 65, 16])
      y = y.view(y.size(0), -1)            # torch.Size([batch_size, 1040])
      y = self.layer7(y)                # torch.Size([batch_size, 1])
      return y

In [0]:
# train set
def default_loader(path):
    return loadmat(path)['output']

class MyDataset(Dataset):
  def __init__(self, txt, transform=None, target_transform=None, loader=default_loader):
    super(MyDataset,self).__init__()
    fh = open(txt, 'r')
    feats = []
    for line in fh:
      line = line.strip('\n')                      
      line = line.rstrip('\n')                     
      words = line.split()
      feats.append(('IITD/IITD_train_feat/'+words[0], 'LFW/LFW_train_feat/'+words[0], float(words[1])))
    self.feats = feats
    self.transform = transform
    self.target_transform = target_transform
    self.loader = loader
  def __getitem__(self, index):
    f1, f2, label = self.feats[index]
    feat1 = self.loader(f1)
    feat2 = self.loader(f2)
    if self.transform is not None:
      feat1 = self.transform(feat1)
      feat2 = self.transform(feat2)
    return feat1,feat2,label
  def __len__(self):
    return len(self.feats)

train_data = MyDataset(txt='train.txt', transform=transforms.ToTensor())
train_loader = DataLoader(dataset=train_data, batch_size=40, shuffle=False)
print('num_of_trainData:', len(train_data))
print('num_of_train_loader:', len(train_loader))

In [0]:
# train
model = FaceIrisQANet()
if torch.cuda.is_available():
  model = model.cuda()
model.load_state_dict(torch.load('FaceIrisQANet_params.pkl'))
criterion = nn.L1Loss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=20, verbose=False, threshold=0.0001, threshold_mode='rel', cooldown=0, min_lr=0, eps=1e-08)

train_loss_min = np.Inf
for epoch in range(500):
  train_loss = 0.0
  for data in train_loader:
    IrisFeat, FaceFeat, label = data
    if torch.cuda.is_available():
        IrisFeat = IrisFeat.cuda()
        FaceFeat = FaceFeat.cuda()
        label = label.cuda()
    else:
        IrisFeat = Variable(IrisFeat)
        FaceFeat = Variable(FaceFeat)
        label = Variable(label)
    target = torch.unsqueeze(label,1)
    optimizer.zero_grad()
    out = model(IrisFeat, FaceFeat)
    loss = criterion(out, target.float())
    loss.backward()
    optimizer.step()
    train_loss += loss.item()*IrisFeat.size(0)
  train_loss = train_loss/len(train_loader.dataset)
  scheduler.step(train_loss)
  print('LR: {} Epoch: {} \tTraining Loss: {:.6f}'.format(optimizer.state_dict()['param_groups'][0]['lr'], epoch, train_loss))  
  # save model if Train loss has decreased
  if train_loss < train_loss_min:
    print('train loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(train_loss_min, train_loss))
    torch.save(model.state_dict(), 'FaceIrisQANet_params.pkl')
    train_loss_min = train_loss
  if train_loss < 0.001:
    print('train loss has met the need')
    break

In [0]:
# test set
def default_loader(path):
    return loadmat(path)['output']

class MyDataset(Dataset):
  def __init__(self, txt, transform=None, target_transform=None, loader=default_loader):
    super(MyDataset,self).__init__()
    fh = open(txt, 'r')
    feats = []
    for line in fh:
      line = line.strip('\n')    
      line = line.rstrip('\n')    
      words = line.split()
      feats.append(('IITD/IITD_test_feat/'+words[0], 'LFW/LFW_test_feat/'+words[0]))
    self.feats = feats
    self.transform = transform
    self.target_transform = target_transform
    self.loader = loader
  def __getitem__(self, index):
    f1, f2 = self.feats[index]
    feat1 = self.loader(f1)
    feat2 = self.loader(f2)
    if self.transform is not None:
      feat1 = self.transform(feat1)
      feat2 = self.transform(feat2)
    return feat1, feat2
  def __len__(self):
    return len(self.feats)

test_data = MyDataset(txt='test.txt', transform=transforms.ToTensor())
test_loader = DataLoader(dataset=test_data, batch_size=40, shuffle=False)
print('num_of_testData:', len(test_data))
print('num_of_test_loader:', len(test_loader))

In [0]:
# test
model = FaceIrisQANet()
if torch.cuda.is_available():
  model = model.cuda()
model.load_state_dict(torch.load('FaceIrisQANet_params.pkl'))
iris_score = [[1]]

for data in test_loader:
  IrisFeat, FaceFeat, label = data
  if torch.cuda.is_available():
      IrisFeat = IrisFeat.cuda()
      FaceFeat = FaceFeat.cuda()
  else:
      IrisFeat = Variable(IrisFeat)
      FaceFeat = Variable(FaceFeat)
  out = model(IrisFeat, FaceFeat)   
  iris_score = np.append(iris_score,out.cpu().detach().numpy(),axis=0)
  print(np.shape(iris_score))

iris_score = np.delete(iris_score,0,axis=0)
iris_score = iris_score.T
savemat('IITD_LFW_test_quality_score.mat',{'iris_score':iris_score})