## Install Pytorch if needed

In [1]:
# http://pytorch.org/


# from os.path import exists
# from wheel.pep425tags import get_abbr_impl, get_impl_ver, get_abi_tag
# platform = '{}{}-{}'.format(get_abbr_impl(), get_impl_ver(), get_abi_tag())
# cuda_output = !ldconfig -p|grep cudart.so|sed -e 's/.*\.\([0-9]*\)\.\([0-9]*\)$/cu\1\2/'
# accelerator = cuda_output[0] if exists('/dev/nvidia0') else 'cpu'

# !pip install -q http://download.pytorch.org/whl/{accelerator}/torch-0.4.1-{platform}-linux_x86_64.whl torchvision
# import torch

In [2]:
import torch
import torch.nn as nn
from torch.autograd.function import Function
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR

import math
import numpy as np
from pdb import set_trace as bp

print("Pytorch version:  " + str(torch.__version__))
use_cuda = torch.cuda.is_available()
print("Use CUDA: " + str(use_cuda))

FEATURE_SIZE = 3
BATCH_SIZE = 64
BATCH_SIZE_TEST = 1000
EPOCHS = 50
LOG_INTERVAL = 200
NUM_OF_CLASSES = 10
LR = 1e-1  # initial learning rate
LR_STEP = 10
LR_DECAY = 0.95  # when val_loss increase, LR = LR*LR_DECAY

torch.manual_seed(1)
device = 'cuda' #torch.device("cuda" if use_cuda else "cpu")

kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}


Pytorch version:  1.2.0
Use CUDA: True


In [3]:
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=True, download=True,
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                   ])),
    batch_size=BATCH_SIZE, shuffle=True, **kwargs)
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=False, transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                   ])),
    batch_size=BATCH_SIZE_TEST, shuffle=True, **kwargs)

In [4]:

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        krnl_sz=3
        strd = 1
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=20, kernel_size=krnl_sz, stride=strd, padding=1)
        self.conv2 = nn.Conv2d(in_channels=20, out_channels=50, kernel_size=krnl_sz, stride=strd, padding=1)
        self.conv3 = nn.Conv2d(in_channels=50, out_channels=64, kernel_size=krnl_sz, stride=strd, padding=1)
        self.conv4 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=krnl_sz, stride=strd, padding=1)
        self.conv5 = nn.Conv2d(in_channels=128, out_channels=512, kernel_size=krnl_sz, stride=strd, padding=1)
        self.conv6 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=krnl_sz, stride=strd, padding=1)
        self.prelu_weight = nn.Parameter(torch.Tensor(1).fill_(0.25))
        self.fc1 = nn.Linear(3*3*512, FEATURE_SIZE)
        self.fc3 = nn.Linear(FEATURE_SIZE, 10)
    def forward(self, x):
        mp_ks=2
        mp_strd=2
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, kernel_size=mp_ks, stride=mp_strd)
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = F.max_pool2d(x, kernel_size=mp_ks, stride=mp_strd)
        x = F.relu(self.conv5(x))
        x = F.max_pool2d(x, kernel_size=mp_ks, stride=mp_strd)
        x = x.view(-1, 3*3*512) # Flatten
        features3d = self.fc1(x)
        x = F.prelu(features3d, self.prelu_weight)
        x = self.fc3(x)
        return x, features3d

class ArcMarginProduct(nn.Module):
    r"""Implement of large margin arc distance: :
        Args:
            in_features: size of each input sample
            out_features: size of each output sample
            s: norm of input feature
            m: margin

            cos(theta + m)
        """
    def __init__(self, in_features, out_features, device, s=30.0, m=0.50, easy_margin=False):
        super(ArcMarginProduct, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.s = s
        self.m = m
        self.weight = nn.Parameter(torch.FloatTensor(out_features, in_features))
        nn.init.xavier_uniform_(self.weight)
        self.device = device

        self.easy_margin = easy_margin
        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m

    def forward(self, input, label):
#         import ipdb; ipdb.set_trace()
        # --------------------------- cos(theta) & phi(theta) ---------------------------
        cosine = F.linear(F.normalize(input), F.normalize(self.weight))
        sine = torch.sqrt(1.0 - torch.pow(cosine, 2))
        phi = cosine * self.cos_m - sine * self.sin_m
        if self.easy_margin:
            phi = torch.where(cosine > 0, phi, cosine)
        else:
            phi = torch.where(cosine > self.th, phi, cosine - self.mm)
        # --------------------------- convert label to one-hot ---------------------------
        # one_hot = torch.zeros(cosine.size(), requires_grad=True, device='cuda')
        one_hot = torch.zeros(cosine.size(), device=self.device)
        one_hot.scatter_(1, label.view(-1, 1).long(), 1)
        output = (one_hot * phi) + ((1.0 - one_hot) * cosine)  # you can use torch.where if your torch.__version__ is 0.4
        output *= self.s
        return output

def train(model, metric_fc, criterion, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, labels) in enumerate(train_loader):
        data, labels = data.to(device), labels.to(device)
        pred, features3d = model(data)

        output = metric_fc(features3d, labels)
        loss = criterion(output, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
  
        if batch_idx % LOG_INTERVAL == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))




##########################################################

model = Net().to(device)
# model.to(device)

# metric_fc = ArcMarginProduct(FEATURE_SIZE, NUM_OF_CLASSES, device, s=30, m=0.5, easy_margin=False).to(device)
from losses import AngularPenaltySMLoss

criterion = AngularPenaltySMLoss(FEATURE_SIZE, NUM_OF_CLASSES, loss_type='cosface').to('cuda') # loss_type in ['arcface', 'sphereface', 'cosface']

# metric_fc.to(device)

criterion = torch.nn.CrossEntropyLoss()

model_params = list(model.parameters())
# metric_params = list(metric_fc.parameters())
param = model_params#+metric_params
optimizer = torch.optim.Adam(param, lr=LR, weight_decay=LR_DECAY)

scheduler = StepLR(optimizer, step_size=LR_STEP, gamma=0.1)



In [5]:
def test(model, metric_fc, criterion, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, labels in test_loader:
            data, labels = data.to(device), labels.to(device)
            pred,features3d = model(data)
#             output = metric_fc(features3d, labels)
            test_loss += criterion(pred, labels)
            # test_loss += centerLoss(output, target, device, features3d)
#             pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
#             correct += pred.eq(labels.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
  
    print('\nTest set: Average loss: {}\n'.format(str(test_loss)))

In [6]:
    test(model, None, criterion, device, test_loader)



Test set: Average loss: tensor(0.0024, device='cuda:0')



In [7]:
def train(model, metric_fc, criterion, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, labels) in enumerate(train_loader):
        data, labels = data.to(device), labels.to(device)
        pred, features3d = model(data)
#         import ipdb; ipdb.set_trace()
#         output = metric_fc(features3d, labels)
        loss = criterion(pred, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
  
        if batch_idx % LOG_INTERVAL == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))


In [8]:
for epoch in range(1, EPOCHS + 1):
    train(model, None, criterion, device, train_loader, optimizer, epoch)
    test(model, None, criterion, device, test_loader)

torch.save(model.state_dict(),"mnist_cnn-arcface-loss.pt")


Test set: Average loss: tensor(0.0023, device='cuda:0')



KeyboardInterrupt: 

## Load Model

In [None]:
device = torch.device("cuda" if use_cuda else "cpu")
model = Net()
model.eval()
model.load_state_dict(torch.load("mnist_cnn-arcface-loss.pt", map_location='cpu'))
model.to(device)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

In [None]:
lbl = test_loader.dataset[ind]
# lb

In [None]:
label_tensor

In [None]:
ind = 10

image = test_loader.dataset[ind][0].numpy().reshape(28,28)
lbl = test_loader.dataset[ind][1]#.numpy()
plt.title('this is  --->   ' + str(lbl))
plt.imshow(image, cmap='gray')


image_tensor, label_tensor = test_loader.dataset[ind]
image_tensor = image_tensor.reshape(1,1,28,28)
image_tensor = image_tensor.to(device)
# label_tensor = label_tensor.to(device)

prediction, features3d = model(image_tensor)
prediction = np.argmax(prediction.cpu().detach().numpy())
print ("\033[92m" + "PREDICTION : " + str(prediction) + "\033[0m")

print("features3d:  " + str(features3d.cpu().detach().numpy()))
# print("features2d:  " + str(features2d.cpu().detach().numpy()))


-----------

In [None]:
# Visualize train_data

In [None]:
f3d_train = []
lbls_train = []

for i in range(10000):
    image_tensor_train, label_tensor_train = train_loader.dataset[i]
    image_tensor_train = image_tensor_train.reshape(1,1,28,28)
    image_tensor_train, label_tensor_train = image_tensor_train.to(device), label_tensor_train#.to(device)

    prediction_train, features3d_train = model(image_tensor_train)
    f3d_train.append(features3d_train[0].cpu().detach().numpy())

    prediction_train = np.argmax(prediction_train.cpu().detach().numpy())    
    lbls_train.append(prediction_train)

    

feat3d_train = np.array(f3d_train)
print("3d features train shape" + str(feat3d_train.shape))
lbls_train = np.array(lbls_train)
print("labels train shape" + str(lbls_train.shape))


In [None]:
f3d_test = []
lbls_test = []

for i in range(10000):
    image_tensor_test, label_tensor_test = test_loader.dataset[i]
    image_tensor_test = image_tensor_test.reshape(1,1,28,28)
    image_tensor_test, label_tensor_test = image_tensor_test.to(device), label_tensor_test#.to(device)

    prediction_test, features3d_test = model(image_tensor_test)
    f3d_test.append(features3d_test[0].cpu().detach().numpy())

    prediction_test = np.argmax(prediction_test.cpu().detach().numpy())    
    lbls_test.append(prediction_test)

    
    
feat3d_test = np.array(f3d_test)
print("3d features test shape" + str(feat3d_test.shape))
lbls_test = np.array(lbls_test)
print("labels test shape" + str(lbls_test.shape))


# Visualize 2d

In [None]:
# import matplotlib.pyplot as plt
# %matplotlib inline

# f = plt.figure(figsize=(16,9))
# c = ['#ff0000', '#ffff00', '#00ff00', '#00ffff', '#0000ff', 
#      '#ff00ff', '#990000', '#999900', '#009900', '#009999']
# for i in range(10):
#     plt.plot(feat2d[lbls==i,0].flatten(), feat2d[lbls==i,1].flatten(), '.', c=c[i])
# plt.legend(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'])

# plt.grid()
# plt.show()

# Visualize Train Data 3d

In [None]:
from mpl_toolkits import mplot3d
import matplotlib.pyplot as plt
%matplotlib inline

fig = plt.figure(figsize=(16,9))
ax = plt.axes(projection='3d')

for i in range(10):
    # Data for three-dimensional scattered points
    xdata = feat3d_train[lbls_train==i,2].flatten()
    ydata = feat3d_train[lbls_train==i,0].flatten()
    zdata = feat3d_train[lbls_train==i,1].flatten()
    ax.scatter3D(xdata, ydata, zdata);
ax.legend(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'],loc='center left', bbox_to_anchor=(1, 0.5))

plt.show()

# Visualize Test Data 3d

In [None]:
from mpl_toolkits import mplot3d
import matplotlib.pyplot as plt
%matplotlib inline

fig = plt.figure(figsize=(16,9))
ax = plt.axes(projection='3d')

for i in range(10):
    # Data for three-dimensional scattered points
    xdata = feat3d_test[lbls_test==i,2].flatten()
    ydata = feat3d_test[lbls_test==i,0].flatten()
    zdata = feat3d_test[lbls_test==i,1].flatten()
    ax.scatter3D(xdata, ydata, zdata);
ax.legend(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'],loc='center left', bbox_to_anchor=(1, 0.5))

plt.show()