In [1]:
import numpy as np
import pandas as pd
import os,sys
import torch
from torchvision import models,transforms
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [2]:
class L2pooling(nn.Module):
    def __init__(self, filter_size=5, stride=2, channels=None, pad_off=0):
        super(L2pooling, self).__init__()
        self.padding = (filter_size - 2 )//2
        self.stride = stride
        self.channels = channels
        a = np.hanning(filter_size)[1:-1]
        g = torch.Tensor(a[:,None]*a[None,:])
        g = g/torch.sum(g)
        self.register_buffer('filter', g[None,None,:,:].repeat((self.channels,1,1,1)))

    def forward(self, input):
        input = input**2
        out = F.conv2d(input, self.filter, stride=self.stride, padding=self.padding, groups=input.shape[1])
        return (out+1e-12).sqrt()


In [3]:
class DUALNETIQ(torch.nn.Module):
    def __init__(self, load_weights=True):
        super(DUALNETIQ, self).__init__()

        self.kappa = (torch.load('kappa.pt')).unsqueeze(0).unsqueeze(2).unsqueeze(3).to('cuda')
        self.xi = (torch.load('xi.pt')).unsqueeze(0).unsqueeze(2).unsqueeze(3).to('cuda')

        # Load the pretrained VGG19 model
        vgg_pretrained_features = models.vgg19(pretrained=True).features
        
        # Load the pretrained SqueezeNet model
        model = models.squeezenet1_1(pretrained=True).features
        
        self.stage1 = torch.nn.Sequential()
        self.stage2 = torch.nn.Sequential()
        self.stage3 = torch.nn.Sequential()
        self.stage4 = torch.nn.Sequential()
        self.stage5 = torch.nn.Sequential()
        self.stage6 = torch.nn.Sequential()
        self.stage7 = torch.nn.Sequential()

        # VGG19 Stages
        for x in range(0,4):
            self.stage1.add_module(str(x), vgg_pretrained_features[x])
        self.stage1.add_module(str(4), L2pooling(channels=64))
        for x in range(5, 9):
            self.stage1.add_module(str(x), vgg_pretrained_features[x])
        self.stage1.add_module(str(9), L2pooling(channels=128))
        for x in range(10, 18):
            self.stage1.add_module(str(x), vgg_pretrained_features[x])
        self.stage2.add_module(str(18), L2pooling(channels=256))
        for x in range(19, 27):
            self.stage2.add_module(str(x), vgg_pretrained_features[x])

        # SqueezeNet Stages
        for x in range(0,2):
            self.stage3.add_module(str(x), model[x])
        self.stage4.add_module(str(2), L2pooling(channels=64))
        for x in range(3, 5):
            self.stage4.add_module(str(x), model[x])
        self.stage5.add_module(str(5), L2pooling(channels=128))
        for x in range(6, 8):
            self.stage5.add_module(str(x), model[x])
        self.stage6.add_module(str(8), L2pooling(channels=256))
        for x in range(9, 11):
            self.stage6.add_module(str(x), model[x])
        for x in range(11, 13):
            self.stage7.add_module(str(x), model[x])

        for param in self.parameters():
            param.requires_grad = False

        self.register_buffer("mean", torch.tensor([0.485, 0.456, 0.406]).view(1,-1,1,1))
        self.register_buffer("std", torch.tensor([0.229, 0.224, 0.225]).view(1,-1,1,1))

        self.chns = [3,256,512,64,128,256,384,512]
         
    def forward_once(self, x):
        h_start = (x-self.mean)/self.std
        h = self.stage1(h_start)
        h1 = h
        h = self.stage2(h)
        h2 = h
        h = self.stage3(h_start)
        h3 = h
        h = self.stage4(h)
        h4 = h
        h = self.stage5(h)
        h5 = h
        h = self.stage6(h)
        h6 = h
        h = self.stage7(h)
        h7 = h
        return [x, h1, h2, h3, h4, h5, h6, h7]

    def forward(self, x, y, require_grad=False, batch_average=False):
        if require_grad:
            feats0 = self.forward_once(x)
            feats1 = self.forward_once(y)
        else:
            with torch.no_grad():
                feats0 = self.forward_once(x)
                feats1 = self.forward_once(y)
        dist1 = 0
        dist2 = 0
        c1 = 1e-6
        c2 = 1e-6
        w_sum = self.kappa.sum() + self.xi.sum()
        kappa = torch.split(self.kappa/w_sum, self.chns, dim=1)
        xi = torch.split(self.xi/w_sum, self.chns, dim=1)
        for k in range(len(self.chns)):
            x_mean = feats0[k].mean([2,3], keepdim=True)
            y_mean = feats1[k].mean([2,3], keepdim=True)
            S1 = (2*x_mean*y_mean+c1)/(x_mean**2+y_mean**2+c1)
            dist1 = dist1+(kappa[k]*S1).sum(1,keepdim=True)

            x_var = ((feats0[k]-x_mean)**2).mean([2,3], keepdim=True)
            y_var = ((feats1[k]-y_mean)**2).mean([2,3], keepdim=True)
            xy_cov = (feats0[k]*feats1[k]).mean([2,3],keepdim=True) - x_mean*y_mean
            S2 = (2*xy_cov+c2)/(x_var+y_var+c2)
            dist2 = dist2+(xi[k]*S2).sum(1,keepdim=True)

        score = 1 - (dist1+dist2).squeeze()
        if batch_average:
            return score.mean()
        else:
            return score

In [4]:
def prepare_image(image, resize=True):
    resize = transforms.Resize((256, 256))
    image = resize(image)
    image = transforms.ToTensor()(image)
    return image.unsqueeze(0)

In [5]:
ref = prepare_image(Image.open('bridge.png').convert("RGB"))
dist = prepare_image(Image.open('bridge.BLUR.5.png').convert("RGB"))
assert ref.shape == dist.shape

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = DUALNETIQ().to(device)
ref = ref.to(device)
dist = dist.to(device)
score = model(ref, dist)
print(score.item())
# 0.3308



0.3308456540107727
