In [0]:
import torch
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torchvision import models
import torch.nn as nn
from torch import optim
import numpy as np

In [0]:
batch_size = 64
num_workers = 1

#define transforms
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

# load imagenet
image_dataset = {
        'train' :datasets.CIFAR10('./', train=True, download=True, transform=data_transforms['train']),
        'test' : datasets.CIFAR10('./', train=False, download=True, transform=data_transforms['train']) 
}

# Create the dataloaders
data_loader = {
    'train': torch.utils.data.DataLoader(image_dataset['train'], batch_size=batch_size, shuffle=True, num_workers=num_workers),
    'test': torch.utils.data.DataLoader(image_dataset['test'], batch_size=batch_size, shuffle=False, num_workers=num_workers)
}

In [0]:
# An identity layer to pass the fc layer in resnet
class Identity(nn.Module):
    def __init__(self):
        super(Identity, self).__init__()
        
    def forward(self, x):
        return x
  
# Define model
resnet18  = models.resnet18(pretrained=True)
resnet18.fc = Identity()

# Freeze all the parameters in the model
def freeze_model(model):
  for params in model.parameters():
    params.requires_grad=False


In [0]:
!git clone https://github.com/jjmachan/DeepHash
!mv siamese-triplet triplet
import triplet.datasets

In [0]:
import torch
from torch.optim import lr_scheduler
import torch.optim as optim
from torch.autograd import Variable

from triplet.trainer import fit
import numpy as np
cuda = torch.cuda.is_available()

%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt


In [0]:
from torch.utils.data import Dataset
import numpy as np
from PIL import Image

class TripletCifar1(Dataset):
    """
    Train: For each sample (anchor) randomly chooses a positive and negative samples
    Test: Creates fixed triplets for testing
    """

    def __init__(self, dataset):
        self.dataset = dataset
        self.train = self.dataset.train
        self.transform = self.dataset.transform

        if self.train:
            self.train_labels = np.array(self.dataset.targets,dtype=np.float32)
            self.train_data = self.dataset.data
            #self.train_data = self.train_data.reshape(-1, 3, 32, 32)
            #self.train_data = self.train_data.transpose((0, 2, 3, 1))  # convert to HWC
            self.labels_set = set(self.train_labels)
            self.label_to_indices = {label: np.where(self.train_labels == label)[0]
                                     for label in self.labels_set}
        else:
            self.test_labels = np.array(self.dataset.targets)
            self.test_data = self.dataset.data
            # generate fixed triplets for testing
            self.labels_set = set(self.test_labels)
            self.label_to_indices = {label: np.where(self.test_labels == label)[0]
                                     for label in self.labels_set}

            random_state = np.random.RandomState(9)

            triplets = [[i,
                         random_state.choice(self.label_to_indices[self.test_labels[i].item()]),
                         random_state.choice(self.label_to_indices[
                                                 np.random.choice(
                                                     list(self.labels_set - set([self.test_labels[i].item()]))
                                                 )
                                             ])
                         ]
                        for i in range(len(self.test_data))]
            self.test_triplets = triplets

    def __getitem__(self, index):
        if self.train:
            img1, label1 = self.train_data[index], self.train_labels[index].item()
            positive_index = index
            while positive_index == index:
                positive_index = np.random.choice(self.label_to_indices[label1])
            negative_label = np.random.choice(list(self.labels_set - set([label1])))
            negative_index = np.random.choice(self.label_to_indices[negative_label])
            img2 = self.train_data[positive_index]
            img3 = self.train_data[negative_index]
        else:
            img1 = self.test_data[self.test_triplets[index][0]]
            img2 = self.test_data[self.test_triplets[index][1]]
            img3 = self.test_data[self.test_triplets[index][2]]

        img1 = Image.fromarray(img1)
        img2 = Image.fromarray(img2)
        img3 = Image.fromarray(img3)
        if self.transform is not None:
            img1 = self.transform(img1)
            img2 = self.transform(img2)
            img3 = self.transform(img3)
        return (img1, img2, img3), []

    def __len__(self):
        return len(self.dataset)

In [0]:
from torch.utils.data.sampler import BatchSampler

class BalancedBatchSampler1(BatchSampler):
    """
    BatchSampler - from a MNIST-like dataset, samples n_classes and within these classes samples n_samples.
    Returns batches of size n_classes * n_samples
    """

    def __init__(self, labels, n_classes, n_samples):
        self.labels = np.array(labels, dtype=np.float32)
        self.labels_set = list(set(self.labels))
        self.label_to_indices = {label: np.where(self.labels == label)[0]
                                 for label in self.labels_set}
        for l in self.labels_set:
            np.random.shuffle(self.label_to_indices[l])
        self.used_label_indices_count = {label: 0 for label in self.labels_set}
        self.count = 0
        self.n_classes = n_classes
        self.n_samples = n_samples
        self.n_dataset = len(self.labels)
        self.batch_size = self.n_samples * self.n_classes

    def __iter__(self):
        self.count = 0
        while self.count + self.batch_size < self.n_dataset:
            classes = np.random.choice(self.labels_set, self.n_classes, replace=False)
            indices = []
            for class_ in classes:
                indices.extend(self.label_to_indices[class_][
                               self.used_label_indices_count[class_]:self.used_label_indices_count[
                                                                         class_] + self.n_samples])
                self.used_label_indices_count[class_] += self.n_samples
                if self.used_label_indices_count[class_] + self.n_samples > len(self.label_to_indices[class_]):
                    np.random.shuffle(self.label_to_indices[class_])
                    self.used_label_indices_count[class_] = 0
            yield indices
            self.count += self.n_classes * self.n_samples

    def __len__(self):
        return self.n_dataset // self.batch_size

In [0]:
# Set up data loaders
from triplet.datasets import BalancedBatchSampler

cuda = torch.cuda.is_available()
train_batch_sampler = BalancedBatchSampler1(image_dataset['train'].targets, n_classes=10, n_samples=25)
test_batch_sampler = BalancedBatchSampler1(image_dataset['test'].targets, n_classes=10, n_samples=25)


In [0]:
kwargs = {'num_workers': 1, 'pin_memory': True} if cuda else {}
online_train_loader = torch.utils.data.DataLoader(image_dataset['train'], batch_sampler=train_batch_sampler, **kwargs)
online_test_loader = torch.utils.data.DataLoader(image_dataset['test'], batch_sampler=test_batch_sampler, **kwargs)

In [0]:
loader = iter(online_train_loader)
images, labels = next(loader)
print(images.shape)

In [0]:
# Set up the network and training parameters
from triplet.networks import EmbeddingNet
from triplet.losses import OnlineTripletLoss
from triplet.utils import AllTripletSelector,HardestNegativeTripletSelector, RandomNegativeTripletSelector, SemihardNegativeTripletSelector # Strategies for selecting triplets within a minibatch
from triplet.metrics import AverageNonzeroTripletsMetric

margin = 1.
embedding_net = resnet18
model = embedding_net
if cuda:
    model.cuda()
loss_fn = OnlineTripletLoss(margin, SemihardNegativeTripletSelector(margin))
lr = 1e-3
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
scheduler = lr_scheduler.StepLR(optimizer, 8, gamma=0.1, last_epoch=-1)
n_epochs = 20
log_interval = 15

In [0]:
fit(online_train_loader, online_test_loader, model, loss_fn, optimizer, scheduler, n_epochs, cuda, log_interval, metrics=[AverageNonzeroTripletsMetric()])

In [0]:
torch.save(model, './triplet_resnet18_SemihardNegetiveTripletSelector.mdl')

In [0]:
loss_fn = OnlineTripletLoss(margin, HardestNegativeTripletSelector(margin))
unfreeze_model(model)
fit(online_train_loader, online_test_loader, model, loss_fn, optimizer, scheduler, n_epochs, cuda, log_interval, metrics=[AverageNonzeroTripletsMetric()])

In [0]:
torch.save(model, './triplet_vgg20_2.mdl')

In [0]:
# save to google drive 

from google.colab import drive
drive.mount('/gdrive')

!cp *.mdl /gdrive/My\ Drive/tooploox

In [0]:
# load data
from google.colab import drive
drive.mount('/gdrive')
!cp /gdrive/My\ Drive/tooploox/*.mdl ./ 

In [0]:
model_test = torch.load('./triplet_vgg19.mdl')
model.eval()

In [0]:
# Freeze all the parameters in the model
def freeze_model(model):
  for params in model.parameters():
    params.requires_grad=False

def unfreeze_model(model):
  for params in model.parameters():
    params.requires_grad= True

# check if all the parameters have been freezed
def list_trainable(model):
  for params in model.parameters():
    print(params.requires_grad)
  
# delete the last layers
def del_last_layers(model_class, num_layers):
  model_class = nn.Sequential(*list(model_class.children())[:-num_layers])
  return model_class

In [0]:
def create_embeddings(model, embedding_size):
  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  print(device)

  features = {}
  targets = {}
  model.to(device)
  features['train'] = np.empty([0, embedding_size])
  targets['train'] = np.empty([0, ])

  features['test'] = np.empty([0, embedding_size])
  targets['test'] = np.empty([0,])

  for i, (images,target) in enumerate(data_loader['train']):
    images = images.to(device)
    target = target.to(device)

    try:
      output = model(images).cpu().numpy()
      features['train'] = np.append(features['train'],output, axis=0)
      targets['train'] = np.append(targets['train'],target.cpu(), axis=0)
    except:
      print(output.shape)
      print('error occured: ', e)
      return (None, None)
    
    if i%100 == 0:
      print(i)

  for i, (images,target) in enumerate(data_loader['test']):
    images = images.to(device)
    target = target.to(device)

    output = model(images).cpu().numpy()
    features['test'] = np.append(features['test'],output, axis=0)
    targets['test'] = np.append(targets['test'],target.cpu(), axis=0)

    if i%100 == 0:
      print(i)
  return (features, targets)

In [0]:
features, targets = create_embeddings(model_emb, 512)

In [0]:
model_emb = model
freeze_model(model_emb)
print(model_emb)

In [0]:
# Save and load
import pickle

def save(features, targets, model_name):
  with open(model_name+'.embs', 'wb') as file:
    pickle.dump((features,targets), file)
    print('file saved in ', model_name)

def load(model_name):
  with open(model_name+'.embs', 'rb') as file:
    features, targets = pickle.load(file)
    return (features, targets)


In [0]:
# save the computed embeddings
save(features, targets, 'resnet18_semihardNegetive')

In [0]:
# save to google drive 

from google.colab import drive
drive.mount('/gdrive')

!cp *embs /gdrive/My\ Drive/tooploox

In [0]:
!ls /gdrive/My\ Drive/tooploox

In [0]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn import metrics

k_range = range(25,30)

def search_knn_accuracies(k_range, features, targets):
  acc = []
  for k in k_range:
    knn = KNeighborsClassifier(n_neighbors=k)
    knn.fit(features['train'], targets['train'])
    #print('finished fitting')
    predict = knn.predict(features['test'][:300,:])
    #print('predicted')
    score = metrics.accuracy_score(targets['test'][:300], predict)
    print('K value: %d, accuracy: %0.7f' %(k, score))
    acc.append(score)
  print('Mean accuracy ',sum(acc)/len(acc))

# the best score was obtained when k = 20:24
search_knn_accuracies(k_range, features, targets)

In [0]:
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patheffects as PathEffects
from sklearn.manifold import TSNE
%matplotlib inline

from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

import seaborn as sns
sns.set_style('darkgrid')
sns.set_palette('muted')
sns.set_context("notebook", font_scale=1.5,
                rc={"lines.linewidth": 2.5})
RS = 123
# need to create a subset of the data, too much time to process otherwise
x_subset = features['train'][:5000]
y_subset = targets['train'][:5000]

print(np.unique(y_subset))
labels = {
     0: 'airplane',  
     1: 'automobile',
     2: 'bird',
     3: 'cat',
     4: 'deer',
     5: 'dog',
     6: 'frog',
     7: 'horse',
     8: 'ship',
     9: 'truck',
}
# Utility function to visualize the outputs of PCA and t-SNE

def fashion_scatter(x, colors):
    # choose a color palette with seaborn.
    num_classes = len(np.unique(colors))
    palette = np.array(sns.color_palette("hls", num_classes))

    # create a scatter plot.
    f = plt.figure(figsize=(8, 8))
    ax = plt.subplot(aspect='equal')
    sc = ax.scatter(x[:,0], x[:,1], lw=0, s=40, c=palette[colors.astype(np.int)])
    plt.xlim(-25, 25)
    plt.ylim(-25, 25)
    ax.axis('off')
    ax.axis('tight')

    # add the labels for each digit corresponding to the label
    txts = []

    for i in range(num_classes):

        # Position of each label at median of data points.

        xtext, ytext = np.median(x[colors == i, :], axis=0)
        txt = ax.text(xtext, ytext, str(labels[i]), fontsize=24)
        txt.set_path_effects([
            PathEffects.Stroke(linewidth=5, foreground="w"),
            PathEffects.Normal()])
        txts.append(txt)

    return f, ax, sc, txts

In [0]:
# do pca before passing to tsne to reduce noice and fast performance
time_start = time.time()

pca_50 = PCA(n_components=50)
pca_result_50 = pca_50.fit_transform(x_subset)

print('PCA with 50 components done! Time elapsed: {} seconds'.format(time.time()-time_start))

print('Cumulative variance explained by 50 principal components: {}'.format(np.sum(pca_50.explained_variance_ratio_)))
# perform tsne on 50 components
time_start = time.time()


fashion_pca_tsne = TSNE(random_state=RS).fit_transform(pca_result_50)

print('t-SNE done! Time elapsed: {} seconds'.format(time.time()-time_start))

In [0]:
fashion_scatter(fashion_pca_tsne, y_subset)