In [1]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import torchvision
import torchvision.transforms as transforms
from torch.nn import functional
import math
import os

# Hyper-parameters 
input_size = 784
num_classes = 10
num_epochs = 1
batch_size = 32
learning_rate = 1e-3

# MNIST dataset (images and labels)
train_dataset = torchvision.datasets.MNIST(root='../../data', 
                                           train=True, 
                                           transform=transforms.Compose([
                                                              transforms.ToTensor(), # first, convert image to PyTorch tensor
                                                              transforms.Normalize((0.1307,), (0.3081,)) # normalize inputs
                                                          ]),
                                           download=True)

test_dataset = torchvision.datasets.MNIST(root='../../data', 
                                          train=False, 
                                          transform=transforms.Compose([
                                                              transforms.ToTensor(), # first, convert image to PyTorch tensor
                                                              transforms.Normalize((0.1307,), (0.3081,)) # normalize inputs
                                                          ]))

In [2]:
# Data loader (input pipeline)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, 
                                           batch_size=batch_size, 
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset, 
                                          batch_size=1, 
                                          shuffle=False)

# Binary ops

In [3]:
class Binarize(torch.autograd.Function):
    THRESHOLD_STE = True
    
    @staticmethod
    def forward(ctx, input):
        """
        We approximate the input by the following:
        
        input ~= sign(input) * l1_norm(input) / input.size
        """
        ctx.save_for_backward(input)
        avg = torch.mean(torch.abs(input))
        return input.sign() * avg

    @staticmethod
    def backward(ctx, grad_output):
        """
        According to [Do-Re-Fa Networks](https://arxiv.org/pdf/1606.06160.pdf),
        the STE for binary weight networks is completely pass through.
        
        However, according to [Binary Neural Networks](https://arxiv.org/pdf/1602.02830.pdf),
        and [XNOR-net networks](https://arxiv.org/pdf/1603.05279.pdf),
        the STE must be thresholded by the following:
        
        d = d * (-1 <= w <= 1)
        
        Set THRESHOLD_STE to True/False for either behavior. However, it is suggested
        to set it to True because we have seen performance degradations with it = False.
        """
        if Binarize.THRESHOLD_STE:
            input, = ctx.saved_tensors
            grad_output[input.ge(1)] = 0
            grad_output[input.le(-1)] = 0
        return grad_output

class BinaryLinear(nn.Module):
    def __init__(self, in_features, out_features):
        """
        Takes in some inputs x, and initializes some weights for matmul,
        and performs a bitcount(xor(x, weights)).
        
        input = (N, M)
        weights = (M, K)
        
        in_features: size of each input sample
        out_features: size of each output sample
        bias: If set to False, the layer will not learn an additive bias.
            Default: ``True``
        """
        super(BinaryLinear, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.weight = torch.nn.Parameter(torch.Tensor(out_features, in_features))
        self.bias = torch.nn.Parameter(torch.Tensor(out_features))
        
        # Initializing parameters
        stdv = 1. / math.sqrt(in_features * out_features)
        # TODO: Remove this for actual training.
#         stdv = 100
        self.weight.data.uniform_(-stdv, stdv)
        if self.bias is not None:
            self.bias.data.uniform_(-stdv, stdv)

    def forward(self, input):
        binarize = Binarize.apply
        return functional.linear(binarize(input), binarize(self.weight), self.bias)
        
class BinaryConvolution2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1,
                 padding=0, bias=True):
        """
        Takes in some inputs x, and initializes some weights for conv filters,
        and performs a "convolution" by binarizing the weights and multiplying
        the inputs by the binarized weights.
        
        input = (N, C, H, W)
        weights = (K, C, H, W) [ to be binarized ]
        biases = (K,) [ to be binarized ]
        output = (N, K, H, W)
        
        in_channels (int): Number of channels in the input image
        out_channels (int): Number of channels produced by the convolution
        kernel_size (int): Size of the convolving kernel
        stride (int or tuple, optional): Stride of the convolution. Default: 1
        padding (int or tuple, optional): Zero-padding added to both sides of the input. Default: 0
        bias (bool, optional): If ``True``, adds a learnable bias to the output. Default: ``True``
        
        NOTE: We skip dilation, groups, etc for now.
        """
        super(BinaryConvolution2d, self).__init__()
        self.weight = torch.nn.Parameter(torch.Tensor(out_channels, in_channels, *(kernel_size, kernel_size)))
        self.bias = torch.nn.Parameter(torch.Tensor(out_channels))
        self.stride = stride
        self.padding = padding
        
        # Initializing parameters
        n = in_channels
        n *= kernel_size ** 2 # number of parameters
        stdv = 1. / math.sqrt(n)
        # TODO: Remove this for actual training.
#         stdv = 100
        
        self.weight.data.uniform_(-stdv, stdv)
        self.bias.data.uniform_(-stdv, stdv)
        
    def forward(self, input):
        binarize = Binarize.apply
        return functional.conv2d(binarize(input), binarize(self.weight), self.bias, self.stride, self.padding)

# Testing

In [4]:
# Testing Binarize:
# Create random Tensors to hold input and outputs.
x = torch.randn(1, 3, requires_grad=True)
w = torch.randn(1, 3, requires_grad=True)
b = torch.randn(1, 1, requires_grad=True)

binarize = Binarize.apply

# Forward pass: compute predicted y using operations; we compute
# ReLU using our custom autograd operation.
for _ in range(10):
    y = binarize(x)
    loss = (y - torch.FloatTensor([1,2,3])).pow(2).sum()
    loss.backward()

    print(loss.item())
    # Update weights using gradient descent
    with torch.no_grad():
        x -= x.grad * 1e-1
        # w -= w.grad * 1e-1
        print(x, x.grad)
        x.grad.zero_()
    print("---")

12.962879180908203
tensor([[ 1.3880, -1.2463,  0.7122]], requires_grad=True) tensor([[ 0.0000,  0.0000, -4.0382]])
---
13.271018981933594
tensor([[ 1.3880, -1.2463,  1.0891]], requires_grad=True) tensor([[ 0.0000,  0.0000, -3.7690]])
---
13.656698226928711
tensor([[ 1.3880, -1.2463,  1.0891]], requires_grad=True) tensor([[0., 0., 0.]])
---
13.656698226928711
tensor([[ 1.3880, -1.2463,  1.0891]], requires_grad=True) tensor([[0., 0., 0.]])
---
13.656698226928711
tensor([[ 1.3880, -1.2463,  1.0891]], requires_grad=True) tensor([[0., 0., 0.]])
---
13.656698226928711
tensor([[ 1.3880, -1.2463,  1.0891]], requires_grad=True) tensor([[0., 0., 0.]])
---
13.656698226928711
tensor([[ 1.3880, -1.2463,  1.0891]], requires_grad=True) tensor([[0., 0., 0.]])
---
13.656698226928711
tensor([[ 1.3880, -1.2463,  1.0891]], requires_grad=True) tensor([[0., 0., 0.]])
---
13.656698226928711
tensor([[ 1.3880, -1.2463,  1.0891]], requires_grad=True) tensor([[0., 0., 0.]])
---
13.656698226928711
tensor([[ 1.388

In [5]:
# Device configuration
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# Convolutional neural network (two convolutional layers)
class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.conv = nn.Sequential(
            BinaryConvolution2d(1, 16, kernel_size=5, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.BatchNorm2d(16),
            BinaryConvolution2d(16, 32, kernel_size=5, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.BatchNorm2d(32))
        self.fc = BinaryLinear(512, num_classes)
        
    def forward(self, x):
        out = self.conv(x)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

In [6]:
def run_model():
    model = ConvNet(num_classes).to(device)
    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Train the model
    total_step = len(train_loader)
    losses = []
    for epoch in range(num_epochs):
        for i, (images, labels) in enumerate(train_loader):
            images = images.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if (i+1) % 50 == 0:
                print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                       .format(epoch+1, num_epochs, i+1, total_step, loss.item()))
                losses.append(loss.item())
    
    ###
    # Test the model
    model.eval()  # eval mode (batchnorm uses moving mean/variance instead of mini-batch mean/variance)
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        acc = 100 * correct / total
        print('Test Accuracy of the model on the 10000 test images: {} %'.format(acc))

    # Save the model checkpoint
    torch.save(model.state_dict(), 'model.ckpt')
    return acc, losses, model

# Runs the model

In [7]:
runs = 1

performance = []
loss_hist = []
for _ in range(runs):
    acc, losses, model = run_model()
    performance.append(acc)
    loss_hist.append(losses)

Epoch [1/1], Step [50/1875], Loss: 0.9558
Epoch [1/1], Step [100/1875], Loss: 0.7956
Epoch [1/1], Step [150/1875], Loss: 0.5258
Epoch [1/1], Step [200/1875], Loss: 0.3214
Epoch [1/1], Step [250/1875], Loss: 0.3723
Epoch [1/1], Step [300/1875], Loss: 0.3148
Epoch [1/1], Step [350/1875], Loss: 0.4053
Epoch [1/1], Step [400/1875], Loss: 0.2025
Epoch [1/1], Step [450/1875], Loss: 0.2150
Epoch [1/1], Step [500/1875], Loss: 0.1592
Epoch [1/1], Step [550/1875], Loss: 0.2790
Epoch [1/1], Step [600/1875], Loss: 0.3174
Epoch [1/1], Step [650/1875], Loss: 0.0901
Epoch [1/1], Step [700/1875], Loss: 0.1215
Epoch [1/1], Step [750/1875], Loss: 0.0891
Epoch [1/1], Step [800/1875], Loss: 0.0938
Epoch [1/1], Step [850/1875], Loss: 0.1423
Epoch [1/1], Step [900/1875], Loss: 0.0669
Epoch [1/1], Step [950/1875], Loss: 0.0637
Epoch [1/1], Step [1000/1875], Loss: 0.0762
Epoch [1/1], Step [1050/1875], Loss: 0.2898
Epoch [1/1], Step [1100/1875], Loss: 0.0923
Epoch [1/1], Step [1150/1875], Loss: 0.1308
Epoch [1

# Extracting the parameters

In [12]:
# mp = model parameters
mp = dict(model.named_parameters())
print(model, '\n\n', mp.keys())
conv_children = list(model.conv.children())

ConvNet(
  (conv): Sequential(
    (0): BinaryConvolution2d()
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (4): BinaryConvolution2d()
    (5): ReLU()
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (fc): BinaryLinear()
) 

 dict_keys(['conv.0.weight', 'conv.0.bias', 'conv.3.weight', 'conv.3.bias', 'conv.4.weight', 'conv.4.bias', 'conv.7.weight', 'conv.7.bias', 'fc.weight', 'fc.bias'])


In [19]:
params = {
    'bconv1': {
        'weight': mp['conv.0.weight'].detach(),
        'bias': mp['conv.0.bias'].detach(),
    },
    'batchnorm1': {
        'weight': mp['conv.3.weight'].detach(),
        'bias': mp['conv.3.bias'].detach(),
        'mean': conv_children[3].running_mean,
        'var': conv_children[3].running_var,
    },
    'bconv2': {
        'weight': mp['conv.4.weight'].detach(),
        'bias': mp['conv.4.bias'].detach(),
    },
    'batchnorm2': {
        'weight': mp['conv.7.weight'].detach(),
        'bias': mp['conv.7.bias'].detach(),
        'mean': conv_children[7].running_mean,
        'var': conv_children[7].running_var,
    },
    'bfc1': {
        'weight': mp['fc.weight'].detach(),
        'bias': mp['fc.bias'].detach(),
    },
}

for k in params:
    if os.path.exists(k):
        assert os.path.isdir(k), '{k} already exists but is not a directory.'.format(k=k)
    else:
        os.mkdir(k)
    for f in params[k]:
        # Create a directory `k`, and inside create `f`.npy files containing the np arrays
        with open(os.path.join(k, f+'.npy'), 'xb+') as fileobj:
            np.save(fileobj, params[k][f].numpy())

# Plot performance graph

In [None]:
print("Average performance : ", np.mean(performance))
for losses in loss_hist:
    plt.plot(losses)
plt.xlabel('Steps (x50 iterations)')
plt.ylabel('Loss')
plt.show()

In [None]:
# Convolutional neural network (two convolutional layers)
# class ConvNet(nn.Module):
#     def __init__(self, num_classes=10):
#         super(ConvNet, self).__init__()
#         self.layer1 = nn.Sequential(
#             BinaryConvolution2d(1, 16, kernel_size=5, stride=1, padding=2),
#         )
        
#     def forward(self, x):
#         out = self.layer1(x)
#         return out

# Extracting individual layers

In [None]:
def get_conv_params(im):
    model = BinaryConvolution2d(1, 16, kernel_size=5, stride=1, padding=0).to(device)

    res = model(im).detach().numpy()
    params = dict(model.named_parameters())
    w = params['weight'].detach().numpy()
    b = params['bias'].detach().numpy()

    print("param shapes:", res.shape, w.shape, b.shape)
    return res, w, b

def get_fc_params(flat_im):
    model = BinaryLinear(784, 100, ).to(device)

    res = model(flat_im).detach().numpy()
    params = dict(model.named_parameters())
    w = params['weight'].detach().numpy()
    b = params['bias'].detach().numpy()

    print("param shapes:", res.shape, w.shape, b.shape)
    return res, w, b

In [None]:
im_torch, lab = iter(test_loader).next()
im = im_torch.numpy()
c_res, c_w, c_b = get_conv_params(im_torch)

In [None]:
flat_im_torch = im_torch.reshape(im_torch.size(0), -1)
flat_im = flat_im_torch.numpy()
fc_res, fc_w, fc_b = get_fc_params(flat_im_torch)

# Save these parameters

In [None]:
with open('conv_matrices/c_res.npy', 'wb+') as f:
    np.save(f, c_res)
with open('conv_matrices/c_w.npy', 'wb+') as f:
    np.save(f, c_w)
with open('conv_matrices/c_b.npy', 'wb+') as f:
    np.save(f, c_b)

In [None]:
with open('fc_matrices/fc_res.npy', 'wb+') as f:
    np.save(f, fc_res)
with open('fc_matrices/fc_w.npy', 'wb+') as f:
    np.save(f, fc_w)
with open('fc_matrices/fc_b.npy', 'wb+') as f:
    np.save(f, fc_b)

In [None]:
with open('input_matrices/im.npy', 'wb+') as f:
    np.save(f, im)
with open('input_matrices/flat_im.npy', 'wb+') as f:
    np.save(f, flat_im)

In [None]:
b_w = np.sign(fc_w) * np.mean(np.abs(fc_w))
b_b = np.sign(fc_b) * np.mean(np.abs(fc_b))
b_x = np.sign(flat_im) * np.mean(np.abs(flat_im))

In [None]:
binarize = Binarize.apply
c_w_torch = torch.from_numpy(c_w)
c_b_torch = torch.from_numpy(c_b)
c_res_torch = functional.conv2d(binarize(im_torch), binarize(c_w_torch), c_b_torch, 1, 2)

In [None]:
print(c_res)