In [1]:
%matplotlib inline
import torchvision
import torchvision.datasets as dset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader,Dataset
import matplotlib.pyplot as plt
import torchvision.utils
import numpy as np
import random
from PIL import Image
import torch
from torch.autograd import Variable
import PIL.ImageOps    
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from pathlib import Path
import torch.nn.init as init

In [2]:
def imshow(img,text=None,should_save=False):
    npimg = img.numpy()
    plt.axis("off")
    if text:
        plt.text(75, 8, text, style='italic',fontweight='bold',
            bbox={'facecolor':'white', 'alpha':0.8, 'pad':10})
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()    

def show_plot(iteration,loss):
    plt.plot(iteration,loss)
    plt.show()

In [3]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.ground_truth = None
        self.labels = None 
        self.img_transform_pre = transforms.Compose([
            transforms.Resize((128, 128)),
            transforms.ToTensor()
        ])
        self.img_transform_post = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((128, 128)),
            transforms.ToTensor()
        ])
        self.cnn1 = nn.Sequential(
            nn.Conv2d(3, 6, kernel_size=5),
            nn.PReLU(),
            nn.AvgPool2d(2, stride=2),
            nn.Conv2d(6, 2, kernel_size=5),
            nn.PReLU(),
            nn.AvgPool2d(2, stride=2))

#         self.fc1 = nn.Sequential(
#             #nn.Linear(50 * 4 * 4, 500),
#             nn.Linear(2 * 61 * 61, 512),
#             nn.ReLU(inplace=True),
#             nn.Linear(512, 256),
#             nn.Linear(256, 128),
#             nn.Linear(128, 32),
#             nn.Linear(32, 2))
        self.f1 = nn.Linear(2 * 29 * 29, 1024)
        self.t = nn.PReLU()
        self.f2 = nn.Linear(1024, 256)
        self.f3 = nn.Linear(256, 64)
        self.f4 = nn.Linear(64, 16)
        self.f5 = nn.Linear(16, 4)
        self.f6 = nn.Linear(4, 2)
        
        init.xavier_uniform(self.f1.weight, gain=np.sqrt(2))
        init.xavier_uniform(self.f2.weight, gain=np.sqrt(2))
        init.xavier_uniform(self.f3.weight, gain=np.sqrt(2))
        init.xavier_uniform(self.f4.weight, gain=np.sqrt(2))
        init.xavier_uniform(self.f5.weight, gain=np.sqrt(2))
        init.xavier_uniform(self.f6.weight, gain=np.sqrt(2))

    def forward_once(self, x):
        output = self.cnn1(x)
        #print(output.shape)
        #exit()
        output = output.view(output.size()[0], -1)
        output = self.f1(output)
        self.t(output)
        output = self.f2(output)
        output = self.f3(output)
        output = self.f4(output)
        output = self.f5(output)
        output = self.f6(output)
        return output

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2
    
    def regularize(self, lam=0.00055):
        loss = Variable(torch.zeros(1))
        for param in self.parameters():
            loss += torch.norm(param)
        loss *= lam
        return loss
    
    def process_image(self, img_array):
        return self.img_transform_post(img_array)
    
    def classify(self, current, omit=None):
        #current = Image.fromarray(current)
        current = self.img_transform_post(current)
        #current = current.unsqueeze(0)
        inputs1 = []
        inputs2 = []
        for i, t in enumerate(self.ground_truth):
            if omit is None or i != omit:
                inputs1.append(t.unsqueeze(0))
                inputs2.append(current.unsqueeze(0))
        inputs1 = Variable(torch.cat(inputs1))
        inputs2 = Variable(torch.cat(inputs2))
        output1, output2 = self(inputs1, inputs2)
        distances = F.pairwise_distance(output1, output2).data.numpy()
        i = 0
        for j, d in enumerate(distances):
            if distances[j] < distances[i]:
                i = j
        return distances, i, self.labels[i]
    
    
    
    
        


In [4]:
class ContrastiveLoss(torch.nn.Module):
    """
    Contrastive loss function.
    Based on: http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    """

    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        #print(euclidean_distance)
        #print(label)
        d = torch.pow(euclidean_distance, 2)
        e =  (1-label)
        part1 = 0.5 * d * e.float()
        part2 = 0.5 * label.float() * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2)
        
        loss_contrastive = torch.mean(part1 + part2)

        return loss_contrastive

In [26]:
fishes = ['lionfish', 'bluecrab']
training_num = [1,2,3,4,5,6,7,8]
testing_num = [9,10,11,12]
path = Path('../data')

In [6]:
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor()
])
raw_data = []
labels = []
for fish in fishes:
    for num in training_num:
        input_path = path / f'{fish}_{num}.jpg'
        img = Image.open(input_path)
        arr = transform(img)
#         arr = np.array(img)
# #         arr = torch.FloatTensor(arr)
# #         arr = arr.unsqueeze(0)
        raw_data.append(arr)
        labels.append(fish)

In [7]:
net = SiameseNetwork()
criterion = ContrastiveLoss(margin=7.0)
optimizer = optim.Adagrad(net.parameters(),lr = 0.0005 )
print(net.regularize())

Variable containing:
1.00000e-02 *
  6.0818
[torch.FloatTensor of size 1]



In [8]:
net.ground_truth = raw_data
net.labels = labels

In [9]:

net.classify(torch.FloatTensor(Variable(raw_data[0]).data.numpy()))

(array([[  1.41421356e-06],
        [  1.54228121e-01],
        [  2.20717594e-01],
        [  2.52739400e-01],
        [  1.88162044e-01],
        [  4.02691633e-01],
        [  2.40279555e-01],
        [  2.40279555e-01],
        [  2.41650701e-01],
        [  3.56941849e-01],
        [  2.41347313e-01],
        [  1.02442428e-01],
        [  3.15337449e-01],
        [  2.70264030e-01],
        [  3.01466614e-01],
        [  3.56941938e-01]], dtype=float32), 0, 'lionfish')

In [10]:
x1 = []
x2 = []
targets = []
count = 0
for idx1, (im1, l1) in enumerate(zip(raw_data, labels)):
    i = im1.unsqueeze(0)
    for idx2, (im2, l2) in enumerate(zip(raw_data, labels)):
        if idx1 != idx2:
            j = im2.unsqueeze(0)
            target = 0 if l1 == l2 else 1
            targets.append(target)
            x1.append(i)
            x2.append(j)
            count += 1

targets = torch.LongTensor(targets)
x1 = torch.cat(x1)
x2 = torch.cat(x2)
print(count)
        

240


In [40]:
num_epochs = 50
for epoch in range(num_epochs):
    optimizer.zero_grad()
    input1 = Variable(x1)
    input2 = Variable(x2)
    correct = Variable(targets)
    output1, output2 = net(input1, input2)
    loss = criterion(output1, output2, correct)
    loss += net.regularize()
    print(f'Loss at {epoch + 1} is {loss.data}')
    loss.backward()
    optimizer.step()

Loss at 1 is 
 7.1286
[torch.FloatTensor of size 1]

Loss at 2 is 
 7.1286
[torch.FloatTensor of size 1]

Loss at 3 is 
 7.1285
[torch.FloatTensor of size 1]

Loss at 4 is 
 7.1284
[torch.FloatTensor of size 1]

Loss at 5 is 
 7.1284
[torch.FloatTensor of size 1]

Loss at 6 is 
 7.1283
[torch.FloatTensor of size 1]

Loss at 7 is 
 7.1283
[torch.FloatTensor of size 1]

Loss at 8 is 
 7.1282
[torch.FloatTensor of size 1]

Loss at 9 is 
 7.1282
[torch.FloatTensor of size 1]

Loss at 10 is 
 7.1281
[torch.FloatTensor of size 1]

Loss at 11 is 
 7.1281
[torch.FloatTensor of size 1]

Loss at 12 is 
 7.1280
[torch.FloatTensor of size 1]

Loss at 13 is 
 7.1280
[torch.FloatTensor of size 1]

Loss at 14 is 
 7.1279
[torch.FloatTensor of size 1]

Loss at 15 is 
 7.1279
[torch.FloatTensor of size 1]

Loss at 16 is 
 7.1279
[torch.FloatTensor of size 1]

Loss at 17 is 
 7.1278
[torch.FloatTensor of size 1]

Loss at 18 is 
 7.1278
[torch.FloatTensor of size 1]

Loss at 19 is 
 7.1278
[torch.FloatTe

In [41]:
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor()
])
raw_data1 = []
labels1 = []
for fish in fishes:
    for num in testing_num:
        input_path = path / f'{fish}_{num}.jpg'
        img = Image.open(input_path)
        arr = transform(img)
#         arr = np.array(img)
# #         arr = torch.FloatTensor(arr)
# #         arr = arr.unsqueeze(0)
        raw_data1.append(arr)
        labels1.append(fish)

In [42]:
im1 = Variable(raw_data[4]).unsqueeze(0)

In [43]:
im2 = Variable(raw_data1[1]).unsqueeze(0)

In [44]:
x, y = net(im1, im2)

In [45]:
F.pairwise_distance(x, y)

Variable containing:
 3.1870
[torch.FloatTensor of size 1x1]

In [46]:
labels1

['lionfish',
 'lionfish',
 'lionfish',
 'lionfish',
 'bluecrab',
 'bluecrab',
 'bluecrab',
 'bluecrab']

In [47]:
labels

['lionfish',
 'lionfish',
 'lionfish',
 'lionfish',
 'lionfish',
 'lionfish',
 'lionfish',
 'lionfish',
 'bluecrab',
 'bluecrab',
 'bluecrab',
 'bluecrab',
 'bluecrab',
 'bluecrab',
 'bluecrab',
 'bluecrab']

In [48]:
def classify(ground_truth, labels, net, current, omit=None):
    inputs1 = []
    inputs2 = []
    for i, t in enumerate(ground_truth):
        if omit is None or i != omit:
            inputs1.append(t.unsqueeze(0))
            inputs2.append(current.unsqueeze(0))
    inputs1 = Variable(torch.cat(inputs1))
    inputs2 = Variable(torch.cat(inputs2))
    output1, output2 = net(inputs1, inputs2)
    distances = F.pairwise_distance(output1, output2).data.numpy()
    i = 0
    for j, d in enumerate(distances):
        if distances[j] < distances[i]:
            i = j
    return distances, i, labels[i]
        
        
    

In [49]:
for idx, l in enumerate(labels1):
    a, b, c = classify(raw_data, labels, net, raw_data1[idx])
    print(f'{c} - {l}')

bluecrab - lionfish
lionfish - lionfish
bluecrab - lionfish
lionfish - lionfish
bluecrab - bluecrab
bluecrab - bluecrab
bluecrab - bluecrab
bluecrab - bluecrab


In [50]:
for idx, l in enumerate(labels):
    a, b, c = classify(raw_data, labels, net, raw_data[idx], idx)
    print(f'{c} - {l}')

lionfish - lionfish
lionfish - lionfish
lionfish - lionfish
lionfish - lionfish
lionfish - lionfish
bluecrab - lionfish
lionfish - lionfish
lionfish - lionfish
bluecrab - bluecrab
bluecrab - bluecrab
lionfish - bluecrab
lionfish - bluecrab
bluecrab - bluecrab
bluecrab - bluecrab
lionfish - bluecrab
bluecrab - bluecrab


In [61]:
net.classify(raw_data1[1])

(array([[ 2.54667044],
        [ 1.0463438 ],
        [ 1.78616011],
        [ 1.79840863],
        [ 3.18704271],
        [ 2.51923275],
        [ 0.99314064],
        [ 0.99314064],
        [ 3.17802095],
        [ 3.84716439],
        [ 1.83481371],
        [ 2.21836829],
        [ 4.03227758],
        [ 3.70564222],
        [ 3.70245957],
        [ 3.84716392]], dtype=float32), 6, 'lionfish')

In [54]:
labels1

['lionfish',
 'lionfish',
 'lionfish',
 'lionfish',
 'bluecrab',
 'bluecrab',
 'bluecrab',
 'bluecrab']

In [62]:
torch.save(net, 'recognizer.pkl')

  "type " + obj.__name__ + ". It won't be checked "
