In [1]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from model_structure import *

from PIL import Image
import numpy as np
import cv2
import pandas as pd
from tqdm import tqdm
from collections import Counter
import random
import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.model_selection import train_test_split

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

In [3]:
data = pd.read_csv('data/train.csv')
db, que = train_test_split(data, train_size=0.8, random_state=42)

In [4]:
train, test = train_test_split(db, train_size=0.8, random_state=42)

In [5]:
get_pictures_train = {}
for _, (idx, label) in train.iterrows():
    if label in get_pictures_train:
        get_pictures_train[label].append(idx)
    else:
        get_pictures_train[label] = [idx]
        
get_pictures_test = {}
for _, (idx, label) in test.iterrows():
    if label in get_pictures_test:
        get_pictures_test[label].append(idx)
    else:
        get_pictures_test[label] = [idx]

In [6]:
pictures_train = []
for vals in get_pictures_train.values():
    pictures_train.append(vals)

pictures_test = []
for vals in get_pictures_test.values():
    pictures_test.append(vals)

In [7]:
img_size = 256

In [8]:
transform = transforms.Compose([
    transforms.ToTensor()
])

class DoubledPicturesDataset(Dataset):
    def __init__(self, pictures, positive_pairs_ratio=0.25, base_dir = 'data/train/'):
        self.base_dir = base_dir
        positive = []
        negative = []
        for i in tqdm(range(len(pictures))):
            for j in range(len(pictures)):
                for i_id in pictures[i]:
                    for j_id in pictures[j]:
                        if i == j:
                            positive.append((i_id, j_id))
                        else:
                            negative.append((i_id, j_id))
        if positive_pairs_ratio is not None:
            self.len = min(len(positive) // positive_pairs_ratio, len(negative) // (1 - positive_pairs_ratio))
            self.positive_len = int(self.len * positive_pairs_ratio)
            self.negative_len = int(self.len * (1 - positive_pairs_ratio))
            random.seed(42)
            random.shuffle(positive)
            random.shuffle(negative)
            positive = positive[:self.positive_len]
            negative = negative[:self.negative_len]
        self.pictures = positive + negative
        self.labels = [1 for _ in range(len(positive))] + [0 for _ in range(len(negative))]
        self.len = len(self.labels)
                        
    
    def __len__(self):
        return self.len
    
    def __getitem__(self, idx):
        pictures = self.pictures[idx]
        label = self.labels[idx]
        picture_1 = cv2.cvtColor(cv2.imread(f'{self.base_dir}{pictures[0]}.png'), cv2.COLOR_BGR2RGB)
        picture_2 = cv2.cvtColor(cv2.imread(f'{self.base_dir}{pictures[1]}.png'), cv2.COLOR_BGR2RGB)
        
        picture_1 = cv2.resize(picture_1, (img_size, img_size))
        picture_2 = cv2.resize(picture_2, (img_size, img_size))
        
        picture_1 = transform(picture_1)
        picture_2 = transform(picture_2)
        
        return {
            'picture_1': picture_1,
            'picture_2': picture_2,
            'labels': label
        }        

In [185]:
train_dataset = DoubledPicturesDataset(pictures_train)
test_dataset = DoubledPicturesDataset(pictures_test)

train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False)

100%|█████████████████████████████████████| 2465/2465 [00:01<00:00, 1301.93it/s]
100%|███████████████████████████████████████| 693/693 [00:00<00:00, 5003.89it/s]


In [186]:
class DebugLayer(nn.Module):
    def __init__(self):
        return
    def forward(self, x):
        print(x.shape)
        return x
    
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.seq_1a = nn.Sequential(
            nn.Conv2d(3, 16, 3, 2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 64, 3, 2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 16, 3, 2)
        )
        self.seq_1b = nn.Sequential(
            nn.Conv2d(3, 16, 3, 2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 64, 3, 2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 16, 3, 2)
        )
        self.seq_2 = nn.Sequential(
            nn.Conv2d(32, 16, 5),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 8, 5),
            nn.BatchNorm2d(8),
            nn.ReLU()
        )
        self.seq_3 = nn.Sequential(
            nn.Linear(8 * 23 * 23, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, 16),
            nn.ReLU(),
            nn.Linear(16, 1)
        )
    
    def forward(self, features):
        picture_1, picture_2 = features
        
        picture_1 = self.seq_1a(picture_1)
        picture_2 = self.seq_1b(picture_2)
        
        x = torch.cat((picture_1, picture_2), 1)
        x = self.seq_2(x)
        x = torch.reshape(x, (-1, 23 * 23 * 8))
        
        x = self.seq_3(x)
        return torch.sigmoid(x)

In [187]:
model = Model().to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4, betas=(0.5, 0.999))

In [188]:
def fit(train_dataloader, test_dataloader, model, criterion, optimizer, epoches):
    for epoch in range(epoches):
        for batch in tqdm(train_dataloader):
            labels = batch['labels'].to(device)
            picture_1 = batch['picture_1'].to(device)
            picture_2 = batch['picture_2'].to(device)
            optimizer.zero_grad()
            outputs = model((picture_1, picture_2)).view(-1)
            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            
        for batch in tqdm(test_dataloader):
            labels = batch['labels'].to(device)
            picture_1 = batch['picture_1'].to(device)
            picture_2 = batch['picture_2'].to(device)
            optimizer.zero_grad()
            outputs = model((picture_1, picture_2)).view(-1)
            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()

In [189]:
fit(train_dataloader, test_dataloader, model, criterion, optimizer, 2)

  6%|██▌                                       | 24/389 [00:57<14:31,  2.39s/it]


KeyboardInterrupt: 