In [1]:
import os
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from PIL import Image
import PIL.ImageOps

import torch
import torch.nn as nn
import torchvision.datasets as datasets
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
import torchvision
import torchvision.transforms as transforms
from torchvision.transforms import ToTensor
import torch.optim as optim

In [2]:
def show_img(img, text = None):
    img = img.numpy()
    plt.axis("off")
    if text != None:
        plt.text(75, 8, text, style = "italic", fontweight = "bold",
                bbox = {"facecolor": "white", "alpha": 0.8, "pad": 10})
    plt.imshow(np.transpose(img, (1, 2, 0)))
    plt.show()

def plot(iteration, loss):
    plt.plot(iteration, loss)
    plt.show()

### Training 부분
1. SiameseNetworkDataset: 2개의 input을 준다. 하나는 Anchor에 해당, 다른 하나는 positive class의 이미지 혹은 negative class의 이미지
2. DataLoader 설정 및 학습 진행.

In [3]:
class SiameseNetworkDataset(Dataset):
    def __init__(self,imageFolderDataset,transform=None):
        self.imageFolderDataset = imageFolderDataset    
        self.transform = transform
        
    def __getitem__(self,index):
        img0_tuple = random.choice(self.imageFolderDataset.imgs)

        #We need to approximately 50% of images to be in the same class
        should_get_same_class = random.randint(0,1)
        # 같은 class인 경우
        if should_get_same_class:
            while True:
                # 같은 class의 이미지가 발견될 때까지 반복한다.
                img1_tuple = random.choice(self.imageFolderDataset.imgs) 
                if img0_tuple[1] == img1_tuple[1]:
                    break
        else:

            while True:
                # 다른 클래스의 image를 찾을 때 까지 반복한다
                img1_tuple = random.choice(self.imageFolderDataset.imgs) 
                if img0_tuple[1] != img1_tuple[1]:
                    break
        # image를 GrayScle로 변환
        img0 = Image.open(img0_tuple[0]).convert("L")
        img1 = Image.open(img1_tuple[0]).convert("L")

        if self.transform is not None:
            img0 = self.transform(img0)
            img1 = self.transform(img1)
        
        # images, labels.
        return img0, img1, torch.from_numpy(np.array([int(img1_tuple[1] != img0_tuple[1])], dtype=np.float32))
    
    def __len__(self):
        return len(self.imageFolderDataset.imgs)

In [None]:
# folder 별로 labeling이 된다.
# tuple 형태로 load가 된다. 0번 index는 image 정보, 1번 index는 label에 해당한다.
# siamese_dataset: img0: anchor image, img1: 비교 image, tuple의 마지막: tensor([0.]) or tensor([1.]) -> 같으면 0, 다르면 1
folder_dataset = datasets.ImageFolder(root = "./PubFig/test")
transformation = transforms.Compose([transforms.Resize([100, 100]),
                                    transforms.ToTensor()])
siamese_dataset = SiameseNetworkDataset(imageFolderDataset = folder_dataset,
                                       transform = transformation)

In [None]:
# 간단한 dataloader 생성(visualization까지 진행)
# 이 부분은 학습에 직접적으로 이용되지는 않는다.
dataloader = DataLoader(siamese_dataset, shuffle = True, num_workers = 0, batch_size = 2)
example_batch = next(iter(dataloader))
concat_data = torch.cat((example_batch[0], example_batch[1]), 0)
show_img(torchvision.utils.make_grid(concat_data))
print(example_batch[2].numpy().reshape(-1))

In [None]:
# loss function의 경우는 Contrastive Loss를 이용
# loss나 모델 자체는 굉장히 단순하다. load를 시켰을 때 가벼운 모델이면 좋겠다.
class SiameseNet(nn.Module):
    def __init__(self):
        super(SiameseNet, self).__init__()
        # convolution 1
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=96, stride=4, kernel_size=11),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2)
        )
        # convolution 2
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=96, out_channels=256, stride=1, kernel_size=5),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2)
        )
        # convolution 3
        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=384, stride=1, kernel_size=3),
            nn.ReLU(inplace=True)
        )
        # fully connected
        self.fc = nn.Sequential(
            nn.Linear(384, 1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 2)
        )
        # he initalization
        torch.nn.init.kaiming_normal_(self.fc[0].weight)
        torch.nn.init.kaiming_normal_(self.fc[2].weight)
        torch.nn.init.kaiming_normal_(self.fc[4].weight)

    def forward_one(self, x):
        output = self.conv1(x)
        output = self.conv2(output)
        output = self.conv3(output)
        # flatten
        output = output.view(output.size()[0], -1)
        output = self.fc(output)
        return output
    
    def forward(self, input1, input2):
        # fully connected를 통과한 최종 vector가 얻어진다.
        # 추후에 이것이 괜찮으면 최종 결과를 저장하는 방식으로 진행한다.
        output1 = self.forward_one(input1)
        output2 = self.forward_one(input2)
        return output1, output2

In [None]:
# Contrastive Loss를 이용한다.
class ContrastiveLoss(nn.Module):
    def __init__(self, margin = 1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin
    
    def forward(self, output1, output2, label):
        euclidean = F.pairwise_distance(output1, output2, keepdim = True)
        contrastive = torch.mean((1-label) * torch.pow(euclidean, 2) +
                                (label) * torch.pow(torch.clamp(self.margin - euclidean, min=0.0), 2))
        return contrastive

In [None]:
train_dataloader = DataLoader(siamese_dataset, shuffle = True, batch_size = 1)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# model = SiameseNet().to(device)
criterion = ContrastiveLoss()
optimizer = optim.AdamW(model.parameters(), lr = 1e-3)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode = "min", factor = 0.5, threshold_mode = "abs", min_lr = 1e-7, verbose = True, patience = 10)

In [None]:
model.to(device)

In [None]:
losses = []
iteration_num = 0
count = []
best_model = None
bestloss = 1
loss_check = []
for epoch in range(200):
    # i는 iteration number에 해당한다.
    model.train()
    for i, (img0, img1, label) in tqdm(enumerate(train_dataloader, 0)):
        img0, img1, label = img0.to(device), img1.to(device), label.to(device)
        optimizer.zero_grad()
        out1, out2 = model(img0, img1)
        # label의 경우는 positive면 1, negative면 0
        loss_ = criterion(out1, out2, label)
        loss_check.append(loss_.item())
        if loss_ < bestloss:
            bestloss = loss_
            best_model = model
        loss_.backward()
        optimizer.step()
        if i%10 == 0:
            iteration_num += 10
            count.append(iteration_num)
            losses.append(loss_.item())
            print("epoch {}th, Current iteration Loss: {}".format(epoch, loss_.item()))
    check_loss = np.mean(loss_check)
    scheduler.step(check_loss)
plot(count, losses)

In [None]:
model = torch.load("./rough.pt")

In [None]:
model

In [None]:
folder_test = datasets.ImageFolder(root = "./PubFig/test/")
testset = SiameseNetworkDataset(imageFolderDataset=folder_test, transform=transformation)
test_loader = DataLoader(testset, num_workers = 0, batch_size = 1, shuffle = False)
dataiter = iter(test_loader)

In [None]:
with torch.no_grad():
    model.cpu()
    model.eval()
    x0, _, _ = next(dataiter)
    for i in range(5):
        _, x1, label = next(dataiter)
        concatenated = torch.cat((x0, x1), 0)
        out1, out2 = model(x0, x1)
        euc = F.pairwise_distance(out1, out2)
        show_img(torchvision.utils.make_grid(concatenated), f"Dissimilarity:{euc.item()}")

In [None]:
torch.save(model, "./rough.pt")

In [None]:
torch.save(best_model, "./best_model.pt")

### Sample을 통한 결과치 명시
여기서부터는 예시를 보여줄 예정.(실행 시켜보고 싶으면 실행 시켜본다.)
1. x = torch.randn([batch_size, channel, width, height])
2. x라는 임의의 tensor 생성
3. 형태는 1 * 2 matrix 생성(fully connected 통과 후)
4. tensor([[-0.3888,  0.3707]])와 같은 결과가 나오게 된다.

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # convolution 1
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=96, stride=4, kernel_size=11),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2)
        )
        # convolution 2
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=96, out_channels=256, stride=1, kernel_size=5),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2)
        )
        # convolution 3
        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=384, stride=1, kernel_size=3),
            nn.ReLU(inplace=True)
        )
        # fully connected
        self.fc = nn.Sequential(
            nn.Linear(384, 1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 2)
        )
        # he initalization
        torch.nn.init.kaiming_normal_(self.fc[0].weight)
        torch.nn.init.kaiming_normal_(self.fc[2].weight)
        torch.nn.init.kaiming_normal_(self.fc[4].weight)

    def forward(self, x):
        out = self.conv1(x)
        out = self.conv2(out)
        out = self.conv3(out)
        out = out.view(out.size()[0], -1)
        out = self.fc(out)
        return out

In [None]:
# 실행 시켜보면 알겠지만 1 * 2 tensor가 결과로 출력된다.
x = torch.randn([1, 1, 100, 100])
net = Net()
net = Net()
with torch.no_grad():
    net.eval()
    print(net(x))

### 하나의 image에 대한 신경망 통과 최종 결과를 뽑기 위한 과정
이 부분 부터 사실상 시작하면 된다. 
1. SiameseNetworkDatasetInference로 Data를 우선 구성
2. dataloader의 batch size는 1개로 진행
3. 우선 학습 자체는 2개의 input으로 진행했다. 따라서 2개의 input을 주되 out을 1개만 받는 쪽으로 가닥을 잡는다.
4. 데이터셋 구성 역시 이에 따라 다르게 진행한다. 1개는 진짜 image, 다른 하나는 random 한 것.
4. 그리고 euclidean 거리를 통해서 유사도 비교를 진행한다.

In [4]:
# model load.
# 학습 때와는 달리 1개의 output만 낸다.
class SiameseNet(nn.Module):
    def __init__(self):
        super(SiameseNet, self).__init__()
        # convolution 1
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=96, stride=4, kernel_size=11),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2)
        )
        # convolution 2
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=96, out_channels=256, stride=1, kernel_size=5),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2)
        )
        # convolution 3
        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=384, stride=1, kernel_size=3),
            nn.ReLU(inplace=True)
        )
        # fully connected
        self.fc = nn.Sequential(
            nn.Linear(384, 1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 2)
        )
        # he initalization
        torch.nn.init.kaiming_normal_(self.fc[0].weight)
        torch.nn.init.kaiming_normal_(self.fc[2].weight)
        torch.nn.init.kaiming_normal_(self.fc[4].weight)

    def forward_one(self, x):
        output = self.conv1(x)
        output = self.conv2(output)
        output = self.conv3(output)
        # flatten
        output = output.view(output.size()[0], -1)
        output = self.fc(output)
        return output
    
    def forward(self, input1, input2):
        # output을 1개만 받는다.
        output1 = self.forward_one(input1)
        output2 = self.forward_one(input2)
        return output1

In [5]:
model = torch.load("./rough.pt")

In [14]:
# 추론을 위한 dataset 구성
class SiameseNetworkDatasetInference(Dataset):
    def __init__(self,imageFolderDataset,transform=None):
        self.imageFolderDataset = imageFolderDataset    
        self.transform = transform
        
    def __getitem__(self,index):
        # self.imageFolderDataset.imgs -> tuple 형태: image file 경로, label 정보
        img0_ = self.imageFolderDataset.imgs
        img0_ = img0_[index]
        img0 = Image.open(img0_[0]).convert("L")

        if self.transform is not None:
            img0 = self.transform(img0)
        
        # 기본적으로 SiameseNetwork를 학습 시킬 때, 2개의 image를 input으로 받아 학습을 돌린다.
        # 학습 외적으로 최종 output을 뽑고 싶으면 임의의 tensor를 생성한 후 같이 return을 시키고 따로 사용하지 않는다.
        random_tensor = torch.randn([1, 100, 100])
        return img0, random_tensor
    
    def __len__(self):
        return len(self.imageFolderDataset.imgs)

# root에 test 시켜볼 파일 넣을 것.
# label 별로 이미지가 들어 있기만 하면 된다.
folder_dataset = datasets.ImageFolder(root = "./PubFig/test")
transformation = transforms.Compose([transforms.Resize([100, 100]),
                                    transforms.ToTensor()])
infer_dataset = SiameseNetworkDatasetInference(imageFolderDataset = folder_dataset,
                                       transform = transformation)

In [15]:
# 확인 결과 folder_dataset의 순서와 infer_dataset의 순서가 같다.(shuffle = False로 둠)
infer_dataloader = DataLoader(infer_dataset, shuffle = False, batch_size = 1)
dataiter = iter(infer_dataloader)

In [16]:
# add_ftr에는 이미지 별로 뽑아낸 신경망 통과한 결과가 들어가게 된다.
add_ftr = []
for i in range(len(dataiter)):
    with torch.no_grad():
        model.cpu()
        model.eval()
        # input으로 x0의 경우는 GrayScale 사진, x1은 임의의 Tensor
        x0, x1 = next(dataiter)
        add_ftr.append(model(x0, x1))

In [17]:
# 각 이미지 별 거리 기반 전
add_ftr

[tensor([[ 1.3045, -0.1385]]),
 tensor([[ 1.3343, -0.1551]]),
 tensor([[0.5642, 0.5259]]),
 tensor([[0.5074, 0.5806]]),
 tensor([[-0.2092,  1.0821]]),
 tensor([[-0.2925,  0.9744]]),
 tensor([[0.4742, 0.3726]]),
 tensor([[0.4515, 0.3274]])]

In [22]:
# 이 부분은 준비한 test set에 따라 달라진다.
# 기본적으로 pariwise_distance(anchor, compare)
for i in range(8):
    # 앞에는 anchor, 뒤에는 비교 이미지
    print(F.pairwise_distance(add_ftr[6], add_ftr[i]))

tensor([0.9750])
tensor([1.0091])
tensor([0.1777])
tensor([0.2105])
tensor([0.9851])
tensor([0.9746])
tensor([1.4142e-06])
tensor([0.0507])
