In [18]:
# ----------------------------------------------------------------------
# Numenta Platform for Intelligent Computing (NuPIC)
# Copyright (C) 2019, Numenta, Inc.  Unless you have an agreement
# with Numenta, Inc., for a separate license for this software code, the
# following terms and conditions apply:
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero Public License version 3 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Affero Public License for more details.
#
# You should have received a copy of the GNU Affero Public License
# along with this program.  If not, see http://www.gnu.org/licenses.
#
# http://numenta.org/licenses/
# ----------------------------------------------------------------------

In [19]:
# Uncomment the following lines to install nupic.torch and torchvision
# !pip install git+https://github.com/numenta/nupic.torch.git#egg=nupic.torch
# !pip install torchvision

In [20]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from tqdm import tqdm_notebook as tqdm

torch.manual_seed(18)
np.random.seed(18)

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [21]:
class RandomNoise(object):
    """
    An image transform that adds noise to random pixels in the image.
    """
    def __init__(self, noise_level=0.0, white_value=0.1307 + 2*0.3081):
        """
        :param noise_level:
          From 0 to 1. For each pixel, set its value to white_value with this
          probability. Suggested white_value is 'mean + 2*stdev'
        """
        self.noise_level = noise_level
        self.white_value = white_value

    def __call__(self, image):
        a = image.view(-1)
        num_noise_bits = int(a.shape[0] * self.noise_level)
        noise = np.random.permutation(a.shape[0])[0:num_noise_bits]
        a[noise] = self.white_value
        return image


def train(model, loader, optimizer, criterion, post_batch_callback=None):
    """
    Train the model using given dataset loader. 
    Called on every epoch.
    :param model: pytorch model to be trained
    :type model: torch.nn.Module
    :param loader: dataloader configured for the epoch.
    :type loader: :class:`torch.utils.data.DataLoader`
    :param optimizer: Optimizer object used to train the model.
    :type optimizer: :class:`torch.optim.Optimizer`
    :param criterion: loss function to use
    :type criterion: function
    :param post_batch_callback: function(model) to call after every batch
    :type post_batch_callback: function
    """
    model.train()
    for batch_idx, (data, target) in enumerate(tqdm(loader, leave=False)):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if post_batch_callback is not None:
            post_batch_callback(model)
        


def test(model, loader, criterion):
    """
    Evaluate pre-trained model using given dataset loader.
    Called on every epoch.
    :param model: Pretrained pytorch model
    :type model: torch.nn.Module
    :param loader: dataloader configured for the epoch.
    :type loader: :class:`torch.utils.data.DataLoader`
    :param criterion: loss function to use
    :type criterion: function
    :return: Dict with "accuracy", "loss" and "total_correct"
    """
    model.eval()
    loss = 0
    total_correct = 0
    with torch.no_grad():
        for data, target in tqdm(loader, leave=False):
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss += criterion(output, target, reduction='sum').item() # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
            total_correct += pred.eq(target.view_as(pred)).sum().item()
    
    return {"accuracy": total_correct / len(loader.dataset), 
            "loss": loss / len(loader.dataset), 
            "total_correct": total_correct}

### Parameters

In [22]:
# CNN layer configuration
IN_CHANNELS = 1
OUT_CHANNELS = 30
KERNEL_SIZE = 5
WIDTH = 28
CNN_OUTPUT_LEN = OUT_CHANNELS * ((WIDTH - KERNEL_SIZE + 1) // 2) ** 2 

# Linear layer configuration
HIDDEN_SIZE = 150
OUTPUT_SIZE = 10    

# Sparsity parameters
SPARSITY = 0.7
SPARSITY_CNN = 0.2

# K-Winners parameters
K = 50
PERCENT_ON = 0.1
BOOST_STRENGTH = 1.4

# Training parameters
LEARNING_RATE = 0.01
MOMENTUM = 0.5
EPOCHS = 10
FIRST_EPOCH_BATCH_SIZE = 4
TRAIN_BATCH_SIZE = 64
TEST_BATCH_SIZE = 1000

# Noise test parameters
NOISE_LEVELS = [0.05, 0.10, 0.15, 0.20, 0.25]

###  Sparse CNN Model
Create a sparse CNN network composed of one sparse convolution layer followed by a sparse linear layer with using k-winner activation between the layers

In [28]:
from nupic.torch.modules import (
    KWinners2d, KWinners, SparseWeights, SparseWeights2d, Flatten, 
    rezero_weights, update_boost_strength
)

sparse_cnn = nn.Sequential(
    # Sparse CNN layer
    SparseWeights2d(
        nn.Conv2d(in_channels=IN_CHANNELS, out_channels=OUT_CHANNELS, kernel_size=KERNEL_SIZE),
        sparsity=SPARSITY_CNN),
    KWinners2d(channels=OUT_CHANNELS, percent_on=PERCENT_ON, boost_strength=BOOST_STRENGTH),
    nn.MaxPool2d(kernel_size=2),

    # Flatten max pool output before passing to linear layer
    Flatten(),

    # Sparse Linear layer
    SparseWeights(nn.Linear(CNN_OUTPUT_LEN, HIDDEN_SIZE), sparsity=SPARSITY),
    KWinners(n=HIDDEN_SIZE, percent_on=PERCENT_ON, boost_strength=BOOST_STRENGTH),

    # Output layer
    nn.Linear(HIDDEN_SIZE, OUTPUT_SIZE),
    nn.LogSoftmax(dim=1)
).to(device)

### Load MNIST Dataset

In [29]:
normalize = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])
train_dataset = datasets.MNIST('data', train=True, download=True, transform=normalize)
test_dataset = datasets.MNIST('data', train=False, transform=normalize)

# Configure data loaders
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=TRAIN_BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=TEST_BATCH_SIZE, shuffle=True)
first_loader = torch.utils.data.DataLoader(train_dataset, batch_size=FIRST_EPOCH_BATCH_SIZE, shuffle=True)

### Train
On the first epoch we use smaller batch size to calculate the duty cycles used by the k-winner function. Once the duty cycles stabilize we can use larger batch sizes. Using the `post_batch`, we rezero the weights after every batch to keep the initial sparsity constant.

In [30]:
def post_batch(model):
    model.apply(rezero_weights)

sgd = optim.SGD(sparse_cnn.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM)
train(model=sparse_cnn, loader=first_loader, optimizer=sgd, criterion=F.nll_loss, post_batch_callback=post_batch)

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for batch_idx, (data, target) in enumerate(tqdm(loader, leave=False)):


  0%|          | 0/15000 [00:00<?, ?it/s]

KeyboardInterrupt: 

After each epoch we apply the boost strength factor

In [None]:
%%capture
sparse_cnn.apply(update_boost_strength)

Test and print results

In [None]:
test(model=sparse_cnn, loader=test_loader, criterion=F.nll_loss)

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for data, target in tqdm(loader, leave=False):


  0%|          | 0/10 [00:00<?, ?it/s]

{'accuracy': 0.9789, 'loss': 0.06301897468566894, 'total_correct': 9789}

At this point the duty cycles should be stable and we can train on larger batch sizes

In [None]:
for epoch in range(1, EPOCHS):
    train(model=sparse_cnn, loader=train_loader, optimizer=sgd, criterion=F.nll_loss, post_batch_callback=post_batch)
    sparse_cnn.apply(update_boost_strength)
    results = test(model=sparse_cnn, loader=test_loader, criterion=F.nll_loss)
    print(results)

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for batch_idx, (data, target) in enumerate(tqdm(loader, leave=False)):


  0%|          | 0/938 [00:00<?, ?it/s]

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for data, target in tqdm(loader, leave=False):


  0%|          | 0/10 [00:00<?, ?it/s]

{'accuracy': 0.985, 'loss': 0.04625854682922363, 'total_correct': 9850}


  0%|          | 0/938 [00:00<?, ?it/s]

  0%|          | 0/10 [00:00<?, ?it/s]

{'accuracy': 0.9857, 'loss': 0.04312329025268555, 'total_correct': 9857}


  0%|          | 0/938 [00:00<?, ?it/s]

  0%|          | 0/10 [00:00<?, ?it/s]

{'accuracy': 0.9869, 'loss': 0.04052887935638428, 'total_correct': 9869}


  0%|          | 0/938 [00:00<?, ?it/s]

  0%|          | 0/10 [00:00<?, ?it/s]

{'accuracy': 0.9868, 'loss': 0.04089242115020752, 'total_correct': 9868}


  0%|          | 0/938 [00:00<?, ?it/s]

  0%|          | 0/10 [00:00<?, ?it/s]

{'accuracy': 0.987, 'loss': 0.03937682189941406, 'total_correct': 9870}


  0%|          | 0/938 [00:00<?, ?it/s]

  0%|          | 0/10 [00:00<?, ?it/s]

{'accuracy': 0.9872, 'loss': 0.039914471054077146, 'total_correct': 9872}


  0%|          | 0/938 [00:00<?, ?it/s]

  0%|          | 0/10 [00:00<?, ?it/s]

{'accuracy': 0.9867, 'loss': 0.04124218235015869, 'total_correct': 9867}


  0%|          | 0/938 [00:00<?, ?it/s]

  0%|          | 0/10 [00:00<?, ?it/s]

{'accuracy': 0.9866, 'loss': 0.04179389133453369, 'total_correct': 9866}


  0%|          | 0/938 [00:00<?, ?it/s]

  0%|          | 0/10 [00:00<?, ?it/s]

{'accuracy': 0.9869, 'loss': 0.04118634967803955, 'total_correct': 9869}


### Noise
Add noise to the input and check the test accuracy

In [None]:
for noise in NOISE_LEVELS:
    noise_transform = transforms.Compose([transforms.ToTensor(), RandomNoise(noise), 
                                      transforms.Normalize((0.1307,), (0.3081,))])
    noise_dataset = datasets.MNIST('data', train=False, transform=noise_transform)
    noise_loader = torch.utils.data.DataLoader(noise_dataset, batch_size=TEST_BATCH_SIZE, shuffle=True)

    results = test(model=sparse_cnn, loader=noise_loader, criterion=F.nll_loss)
    print(noise, ":", results)

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for data, target in tqdm(loader, leave=False):


  0%|          | 0/10 [00:00<?, ?it/s]

0.05 : {'accuracy': 0.9779, 'loss': 0.07207113227844238, 'total_correct': 9779}


  0%|          | 0/10 [00:00<?, ?it/s]

0.1 : {'accuracy': 0.9498, 'loss': 0.15423367614746095, 'total_correct': 9498}


  0%|          | 0/10 [00:00<?, ?it/s]

0.15 : {'accuracy': 0.8957, 'loss': 0.2995627777099609, 'total_correct': 8957}


  0%|          | 0/10 [00:00<?, ?it/s]

0.2 : {'accuracy': 0.8191, 'loss': 0.5322125183105468, 'total_correct': 8191}


  0%|          | 0/10 [00:00<?, ?it/s]

0.25 : {'accuracy': 0.7283, 'loss': 0.8375710815429688, 'total_correct': 7283}
