In [1]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

seed=42


import os

import torch
import torch.nn as nn
import torch.nn.functional as F
if torch.cuda.is_available():
    print("Yeah we have a GPU!")
dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
import torchvision
import torchvision.transforms as transforms
import torch.optim as optim

import matplotlib.pyplot as plt
import numpy as np

from gdeep.decision_boundary.decision_boundary_calculator import *
from gdeep.create_nets.sample_nn import SampleCNN_MNIST_SAMPLE,\
    SampleCNN_MNIST_SAMPLE_2
from gdeep.create_nets import Net

try:
    from color_scale import lab_color_scale
except:
    pass

In [2]:
# https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html

batch_size = 64

transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.1307,), (0.3081,))])

# Train set
trainset = torchvision.datasets.MNIST(root='./data', train=True,
                                        download=True, transform=transform)

# Filters for labels 0 and 1
trainset_0_1 = torch.utils.data.Subset(trainset, torch.stack((trainset.targets == 0, trainset.targets == 1), axis=-1).any(-1).nonzero()[:,0])

trainloader = torch.utils.data.DataLoader(trainset_0_1, batch_size=batch_size,
                                          shuffle=True, num_workers=2,
                                          pin_memory=True)


# Test set
testset = torchvision.datasets.MNIST(root='./data', train=False,
                                       download=True, transform=transform)
# Filters for labels 0 and 1
testset_0_1 = torch.utils.data.Subset(testset, torch.stack((testset.targets == 0, testset.targets == 1), axis=-1).any(-1).nonzero()[:,0])
testloader = torch.utils.data.DataLoader(testset_0_1, batch_size=batch_size,
                                         shuffle=False, num_workers=2,
                                         pin_memory=True)

In [3]:
digit_detect_nn = SampleCNN_MNIST_SAMPLE().to(dev)

print("Trainable parameters of NN:",\
    sum(p.numel() for p in digit_detect_nn.parameters()), '\n')

print(digit_detect_nn)

Trainable parameters of NN: 61026 

SampleCNN_MNIST_SAMPLE(
  (conv1): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=400, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=84, bias=True)
  (fc3): Linear(in_features=84, out_features=2, bias=True)
)


In [4]:
n_epoch = 6

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(digit_detect_nn.parameters(), lr=0.001, momentum=0.9)


for epoch in range(n_epoch):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        inputs = inputs.to(dev)
        labels = labels.to(dev)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = digit_detect_nn(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 20 == 19:    # print every 2000 mini-batches
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 20))
            running_loss = 0.0

print('Finished Training')

[1,    20] loss: 0.701
[1,    40] loss: 0.691
[1,    60] loss: 0.681
[1,    80] loss: 0.670
[1,   100] loss: 0.655
[1,   120] loss: 0.630
[1,   140] loss: 0.587
[1,   160] loss: 0.515
[1,   180] loss: 0.371
[2,    20] loss: 0.075
[2,    40] loss: 0.040
[2,    60] loss: 0.028
[2,    80] loss: 0.022
[2,   100] loss: 0.020
[2,   120] loss: 0.019
[2,   140] loss: 0.020
[2,   160] loss: 0.020
[2,   180] loss: 0.014
[3,    20] loss: 0.012
[3,    40] loss: 0.015
[3,    60] loss: 0.018
[3,    80] loss: 0.012
[3,   100] loss: 0.018
[3,   120] loss: 0.011
[3,   140] loss: 0.009
[3,   160] loss: 0.006
[3,   180] loss: 0.009
[4,    20] loss: 0.009
[4,    40] loss: 0.007
[4,    60] loss: 0.009
[4,    80] loss: 0.010
[4,   100] loss: 0.012
[4,   120] loss: 0.009
[4,   140] loss: 0.007
[4,   160] loss: 0.010
[4,   180] loss: 0.006
[5,    20] loss: 0.012
[5,    40] loss: 0.003
[5,    60] loss: 0.010
[5,    80] loss: 0.010
[5,   100] loss: 0.010
[5,   120] loss: 0.005
[5,   140] loss: 0.006
[5,   160] 

In [5]:
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        images, labels = data
        images = images.to(dev)
        labels = labels.to(dev)
        outputs = digit_detect_nn(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %f %%' % (
    100. * correct / total))

Accuracy of the network on the 10000 test images: 99.905437 %


In [6]:
for param in digit_detect_nn.parameters():
            param.requires_grad = False
model = lambda x: F.softmax(digit_detect_nn(x), dim=-1)

In [None]:
steps = 1000

dc_samples = {'spacial': [], 'gradients': []}

for initial_points_batch, _ in iter(testloader):
    initial_points_batch.to(dev, non_blocking=True)
    g = GradientFlowDecisionBoundaryCalculator(
                model=model,
                initial_points=initial_points_batch,
                optimizer=lambda params: torch.optim.Adam(params)
    )
    #TODO: clip to [0,1]
    g.step(steps)

    y = g.get_decision_boundary()
    delta = torch.zeros_like(y, requires_grad=True)
    loss = torch.sum(torch.einsum('ij,j->i', model(y+delta), torch.tensor([1., 0.])))
    loss.backward()
    dc_samples['gradients'].append(delta.grad.detach())

    dc_samples['spacial'].append(y.detach().cpu())

In [None]:
dc_samples_tensor = {'spacial': [], 'gradients': []}
dc_samples_tensor['spacial'] = torch.cat(dc_samples['spacial'], axis=0)
dc_samples_tensor['gradients'] = torch.cat(dc_samples['gradients'], axis=0).reshape([-1, 28*28])
dc_samples_tensor['normal'] = dc_samples_tensor_loaded['gradients'] / dc_samples_tensor_loaded['gradients'].norm(dim=1)[:, None]

In [None]:
db_file = os.path.join('tensors', 'db_normal')
try:
    torch.save(dc_samples_tensor, db_file)
except:
    pass
dc_samples_tensor_loaded = torch.load(db_file)