In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.models as models
import torchvision.transforms as transforms
from torchvision.models.feature_extraction import get_graph_node_names
from torchvision.models.feature_extraction import create_feature_extractor


from torch.utils.data import TensorDataset, DataLoader, Dataset

import torch.nn.functional as F

import numpy as np


import matplotlib.pyplot as plt

from PIL import Image

import os
import glob


from MyDataLoader import OrderedDataSet, SiameseDataLoader, ShuffleDataLoader
from FileReader import get_picture_tensors
from MyModels import FeatureExtractionCNN
from ModelEvaluation import eval_model


from sklearn.neighbors import KNeighborsClassifier

In [13]:
# data params
root_directory = "dataset_chat/"
n_classes = 30
required_train_imgs = 30
required_test_imgs = 1

In [14]:
(train_images, val_images, test_images, 
 train_labels , val_labels , test_labels , n_classes) = get_picture_tensors(root_directory=root_directory,
                                                              n_classes=n_classes, 
                                                              required_train_imgs=required_train_imgs, 
                                                              required_test_imgs=required_test_imgs,
                                                              use_selected_eval_datasets = False, 
                                                              show_progress=False,
                                                              ordered_dataset=True)

900
30


In [15]:
class ContrastiveLoss(torch.nn.Module):
    """
    Contrastive loss function.
    Based on:
    """

    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, x0, x1, y):
        # euclidian distance
        diff = x0 - x1
        dist_sq = torch.sum(torch.pow(diff, 2), 1)
        dist = torch.sqrt(dist_sq)
        
        mdist = self.margin - dist
        dist = torch.clamp(mdist, min=0.0)
        loss = y * dist_sq + (1 - y) * torch.pow(dist, 2)
        loss = torch.sum(loss) / 2.0 / x0.size()[0]
        
        return loss

In [26]:
class SiameseDataLoader(Dataset):
    def __init__(self, ordered_dataset, batches_per_epoch):

        self.ordered_dataset = ordered_dataset
        self.batches_per_epoch = batches_per_epoch
        
    def __len__(self):
        return self.batches_per_epoch

    def __getitem__(self):
        
        same_class = np.random.choice([1, 0], 1,p=[0.9, 0.1])[0]

        if same_class:
            im1, im2 = self.ordered_dataset.get_same_class()
            
        else: 
            im1, im2 = self.ordered_dataset.get_diff_class()
        
        return im1, im2, same_class

import random

In [27]:
num_epochs = 30
learning_rate = 1e-5


train_ordered_dataset = OrderedDataSet(train_images)
train_dataloader = SiameseDataLoader(train_ordered_dataset, 100)

model = FeatureExtractionCNN(n_classes)

model.train()

criterion = ContrastiveLoss()
params = filter(lambda x: x.requires_grad, model.parameters())
optimizer = optim.Adam(params, lr = learning_rate)

loss_at_each_epoch = []
batches_per_epoch = 100

for epoch in range(num_epochs):
    total_loss = 0
    
    for i in range(batches_per_epoch):
        im1, im2, label = train_dataloader.__getitem__()
        optimizer.zero_grad()
        output1, output2 = model(im1, im2)

        loss = criterion(output1, output2, label)
        total_loss += loss
        loss.backward()
        optimizer.step()

    print(f"End of epoch {epoch}")
    print("Average loss in epoch: ", total_loss.item()/batches_per_epoch)
    loss_at_each_epoch.append(total_loss.item()/batches_per_epoch)
    
print('Training and evaluation finished.')

End of epoch 0
Average loss in epoch:  4.073093872070313
End of epoch 1
Average loss in epoch:  2.1955596923828127
End of epoch 2
Average loss in epoch:  1.0217420959472656
End of epoch 3
Average loss in epoch:  0.4387088394165039
End of epoch 4
Average loss in epoch:  0.18636043548583983
End of epoch 5
Average loss in epoch:  0.11136857986450195
End of epoch 6
Average loss in epoch:  0.05887473583221436
End of epoch 7
Average loss in epoch:  0.058652920722961424
End of epoch 8
Average loss in epoch:  0.034630615711212155
End of epoch 9
Average loss in epoch:  0.04602604389190674
End of epoch 10
Average loss in epoch:  0.045411815643310545
End of epoch 11
Average loss in epoch:  0.05728391170501709
End of epoch 12
Average loss in epoch:  0.05740508556365967


KeyboardInterrupt: 

In [28]:
def flatten_innermost(lst):
    result = []
    for item in lst:
        if isinstance(item, list):
            result.extend(flatten_innermost(item))
        else:
            result.append(item)
    return result

In [29]:
features = []
feature_labels = []

flat_train_images = []
for class_cat in train_images:
    for im in class_cat:
        flat_train_images.append(im)

train_dataset = ShuffleDataLoader(flat_train_images, train_labels)
with torch.no_grad():
    for images, labels in train_dataset:
        images = images.unsqueeze(dim=0)
        outputs = model.forward_once(images)
        features.append(outputs[0])
        feature_labels.append(np.argmax(labels).item())


In [30]:
def get_nearest_neighbors_indices(knn, query_point, k):
    distances, indices = knn.kneighbors([query_point.detach().numpy()], n_neighbors=k)
    predict = knn.predict_proba([query_point.detach().numpy()])
    return predict[0]

# Create a k-NN classifier
k = 5
knn = KNeighborsClassifier(n_neighbors=k)
knn.fit(features, feature_labels)

test_dataset = ShuffleDataLoader(test_images, test_labels)
for images, labels in train_dataset:
    images = images.unsqueeze(0)
    image_array = np.transpose(images[0], (1,2,0))
    
    outputs = model.forward_once(images)
    probas = get_nearest_neighbors_indices(knn, outputs[0], k)
    print(f"Real label {torch.argmax(labels)} : -- probabilities of the classes: ")
    sorted_indices = np.argsort(probas)[::-1]
    for i in range(3):
        class_index = sorted_indices[i]
        probability = probas[class_index]
        print(f"Class {class_index}: Probability {probability}")

"""
i = 0
for images, labels in train_dataset:
    if i in hmm:
        images = images.unsqueeze(0)
        image_array = np.transpose(images[0], (1,2,0))
        plt.imshow(image_array) 
        plt.show()
    i += 1
"""



Real label 11 : -- probabilities of the classes: 
Class 29: Probability 0.2
Class 3: Probability 0.2
Class 5: Probability 0.2
Real label 28 : -- probabilities of the classes: 
Class 26: Probability 0.2
Class 22: Probability 0.2
Class 19: Probability 0.2
Real label 24 : -- probabilities of the classes: 
Class 29: Probability 0.6
Class 10: Probability 0.4
Class 28: Probability 0.0
Real label 23 : -- probabilities of the classes: 
Class 29: Probability 0.4
Class 11: Probability 0.4
Class 0: Probability 0.2
Real label 6 : -- probabilities of the classes: 
Class 21: Probability 0.4
Class 4: Probability 0.2
Class 22: Probability 0.2
Real label 18 : -- probabilities of the classes: 
Class 13: Probability 0.4
Class 2: Probability 0.2
Class 3: Probability 0.2
Real label 0 : -- probabilities of the classes: 
Class 29: Probability 0.2
Class 3: Probability 0.2
Class 23: Probability 0.2
Real label 6 : -- probabilities of the classes: 
Class 24: Probability 0.4
Class 19: Probability 0.2
Class 25: Pr

KeyboardInterrupt: 