In [1]:
import os
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim.lr_scheduler as lr_scheduler
import torch.nn.functional as F

if torch.backends.mps.is_available():
    device = torch.device('mps')
    print( f'you are using a Mac-based GPU' )
elif torch.cuda().is_available():
    device = torch.cuda.device(0)
    print( 'You are using a '+str(torch.cuda.get_device_name(0)) )
else: 
    device = torch.device('cpu')
    print( f'you are using a: {device}' )

you are using a Mac-based GPU


## Import the dataset
---

In [2]:
#@title 1. Import the dataset
from utils import *

task = 'cifar10'
if task =='cifar10':
    num_classes = 10
if task =='cifar100':
    num_classes = 100
batch_size = 128
image_size = 32

train_loader, test_loader = generate_dataset( task=task, train_batch_size=batch_size, test_batch_size=batch_size, image_size=image_size )

## Import the Model
---

In [3]:
from utils_mobilenet_v2 import mobilenet_v2_noise as mobilenet_v2
from utils_mobilenet_v2 import MobileNetV2
from models_utils import Linear

def get_model( weights='IMAGENET1K_V2', out_features=10, noise_inference=False, noise_inference_bn=False, 
               noise_sd=0.05, width_mult=0.4, inverted_residual_setting=None ):
    '''Function that imports a MobileNetV2 model, with the option of loading the pre-trained parameters
    weights : if None, it will initialize the parameters from scratch, otherwise use "IMAGENET1K_V2" for good performance
    out_features : the size of the output layer
    noise_inference : activates STE when True
    noise_sd : the amount of noise for the STE
    '''
    if weights == 'IMAGENET1K_V2' or weights == 'IMAGENET1K_V1' :
        model = mobilenet_v2( weights=weights, noise_inference=noise_inference, noise_inference_bn=noise_inference_bn, noise_sd=noise_sd )
    elif weights == 'cifar_specs':
        inverted_residual_setting = [
                # t, c, n, s
                [1, 16, 1, 1],
                [6, 24, 2, 1],
                [6, 32, 3, 1],
                [6, 64, 4, 2],
                [6, 96, 3, 1],
                [6, 160, 3, 2],
                [6, 320, 1, 1],
            ]
        model = MobileNetV2( num_classes=out_features, width_mult=width_mult, 
                             noise_inference=noise_inference, noise_inference_bn=noise_inference_bn, noise_sd=noise_sd, inverted_residual_setting=inverted_residual_setting )
    elif weights == 'cifar_specs_shallow':
        inverted_residual_setting = [
                # t, c, n, s
                [1, 16, 1, 1],
                #[6, 24, 2, 1],
                [6, 32, 3, 1],
                #[6, 64, 4, 2],
                [6, 96, 3, 1],
                #[6, 160, 3, 2],
                [6, 320, 1, 1],
            ]
        model = MobileNetV2( num_classes=out_features, width_mult=0.4, 
                             noise_inference=noise_inference, noise_inference_bn=noise_inference_bn, noise_sd=noise_sd, inverted_residual_setting=inverted_residual_setting )
    else:
        model = mobilenet_v2( weights=None, noise_inference=noise_inference, noise_inference_bn=noise_inference_bn, noise_sd=noise_sd )
    model.classifier[1] = Linear( in_features=model.classifier[1].in_features, out_features=out_features, 
                                   noise_inference=noise_inference, noise_sd=noise_sd )
    model.classifier[0].p = 0.2 ### IT WAS 0.0 BEFORE
    return model

#model = get_model( weights = 'cifar_specs', out_features=num_classes, noise_inference=True, noise_inference_bn=True )

In [4]:
def get_tot_model_params(model, verbose=True):
    tot_params = 0
    params = []
    for p in model.parameters():
        params.append( p )
        tot_params += ( len( p.flatten() ) )
    if verbose: print( f'Parameter count: {tot_params}' )
    return tot_params

model = get_model( weights = 'cifar_specs', out_features=num_classes, noise_inference=True, width_mult=0.4, noise_inference_bn=False )
_ = get_tot_model_params( model )

Parameter count: 502538


In [5]:
model.classifier

Sequential(
  (0): Dropout(p=0.0, inplace=False)
  (1): Linear(in_features=1280, out_features=10, bias=True, noise_inference=True)
)

In [5]:
### this can go as I have put it on utils.py

def training_algo( training_type, model, data_loaders, optimizer=None, criterion=None, scheduler=None, out_activation=None,
                   device='cpu', lr=1e-3, clip_w=2.5, epochs=10, epochs_noise=2, 
                   noise_sd=1e-2, noise_every=100, levels=None, num_levels=15, print_every=1, verbose=False,
                   save_checkpoint_path=None, load_checkpoint_path=None  ):

    train_loader, test_loader = data_loaders
    if criterion is None: criterion = torch.nn.NLLLoss() #torch.nn.CrossEntropyLoss()
    if optimizer is None: optimizer = torch.optim.SGD( model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4 )
    if scheduler is None: scheduler = torch.optim.lr_scheduler.MultiStepLR( optimizer, milestones=[100, 150, 200], gamma=0.5 )
    if out_activation is None: out_activation = torch.nn.LogSoftmax( dim=-1 )

    if load_checkpoint_path is not None:
        checkpoint = torch.load( load_checkpoint_path, map_location='cpu' )
        model.load_state_dict(checkpoint['model_state_dict'])
        model = model.to(device)
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
        epoch_start = checkpoint['epoch']
    else:
        model = model.to(device)
        epoch_start = 0
    

    losses_train, accs_train = [], []
    for e in range(epochs):
        losses = []
        correct = 0
        tot_samples = 0
        for batch_idx, (x, y) in enumerate(train_loader):
            optimizer.zero_grad()
            x = x.to(device)
            y = y.to(device)
            yhat = model( x )
            y_soft = out_activation( yhat )
            loss = criterion( y_soft, y.long() )
            loss.backward()

            if training_type == 'qat_noise' or training_type == 'qat':
                for p in list(model.parameters()):
                        if hasattr(p,'hid'):
                            p.data.copy_(p.hid)

            optimizer.step()
            if scheduler is not None: scheduler.step()
            losses.append( loss.item() )
            correct += torch.eq( torch.argmax(yhat, dim=1), y ).cpu().sum()
            tot_samples += len(y)

            if e+1 > epochs - epochs_noise and batch_idx%noise_every==0 and training_type == 'noise_fine_tuning':
               with torch.no_grad():
                   for p in model.parameters():
                       delta_w = torch.abs( p.max()-p.min() )
                       n = torch.randn_like( p )*(noise_sd*delta_w)
                       p.copy_( p+n )
            
            if clip_w is not None:
                with torch.no_grad():
                    for p in model.parameters():
                        std_w = torch.std( p )
                        p.clip_( -std_w*clip_w, +std_w*clip_w )

            if training_type == 'qat':
                for p in list(model.parameters()):  # updating the hid attribute
                    if hasattr(p,'hid'):
                        p.hid.copy_(p.data)
                    p.data = quantize( parameters=p.data, levels=levels, num_levels=num_levels, device=device )

            if training_type == 'qat_noise':
                for p in list(model.parameters()):  # updating the hid attribute
                    if hasattr(p,'hid'):
                        p.hid.copy_(p.data)
                    p.data = quantize( parameters=p.data, levels=levels, num_levels=num_levels, device=device )
                    p.data.add_( torch.randn_like(p.data)*noise_sd )

        acc_train = correct/tot_samples
        loss_train = np.mean(losses)
        if verbose and e%print_every==0:
            print( f'Train Epoch {e+1}, Train accuracy {acc_train*100:.2f}% Train loss {loss_train:.4f}' )
        accs_train.append(acc_train); losses_train.append(loss_train)

    if save_checkpoint_path is not None:
        torch.save({
            'epoch': epochs+epoch_start,
            'model_state_dict': model.to('cpu').state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'loss': loss.item(),
            }, save_checkpoint_path.format( epochs+epoch_start+1 ))
        print(f'Checkpoint saved at: {save_checkpoint_path}')


    losses = []
    correct = 0
    tot_samples = 0
    model = model.to(device).eval()
    for x, y in test_loader:
        x = x.to(device)
        y = y.to(device)
        yhat = model( x )
        y_soft = out_activation( yhat )
        loss = criterion( y_soft, y.long() )
        losses.append( loss.item() )
        correct += torch.eq( torch.argmax(yhat, dim=1), y ).cpu().sum()
        tot_samples += len(y)
    acc_test = correct/tot_samples
    loss_test = np.mean(losses)
    if verbose: print( f'Tot epochs {epochs+epoch_start} -- Test accuracy {acc_test*100:.2f}% Test loss {loss_test:.4f}' )

    return model, [accs_train, losses_train], [acc_test, loss_test]


def testing( model, test_loader, criterion=None, out_activation=None, device='cpu', verbose=True ):
    '''The function assessing the test classification accuracy of the model.
    model: model of choice
    test_loader: the test dataloader for the task of choice
    verbose: if True, makes the function output the test accuracy and loss'''
    model = model.to(device).eval()
    losses = []
    correct, tot_samples = 0, 0
    if criterion is None: criterion = torch.nn.NLLLoss()
    if out_activation is None: out_activation = torch.nn.LogSoftmax( dim=-1 )
    for x, y in test_loader:
        x = x.to(device)
        y = y.to(device)
        yhat = model( x )
        y_soft = out_activation( yhat )
        loss = criterion( y_soft, y.long() )
        losses.append( loss.item() )
        correct += torch.eq( torch.argmax(yhat, dim=1), y ).cpu().sum()
        tot_samples += len(y)
    acc_test = correct/tot_samples
    loss_test = np.mean(losses)
    if verbose: print( f'-- Test accuracy {acc_test*100:.2f}% Test loss {loss_test:.4f}' )
    return acc_test, loss_test


## CIFAR 10
---

#### ----> Training

In [7]:
epochs = 1
epochs_load = 1
lr = 1e-2

noise_inference, noise_sd_ste = False, 0.1
data_loaders = [train_loader, test_loader] # [trainloader, testloader]
#model = get_model(weights='IMAGENET1K_V2', noise_inference=noise_inference, noise_sd=noise_sd_ste, out_features=num_classes)
model = get_model( weights = 'cifar_specs', out_features=num_classes, noise_inference=noise_inference, noise_inference_bn=noise_inference, noise_sd=noise_sd_ste )
criterion = torch.nn.NLLLoss()
optimizer = torch.optim.SGD( model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4 )
scheduler = torch.optim.lr_scheduler.MultiStepLR( optimizer, milestones=[100, 150, 200], gamma=0.5 )

model_type = {False:'normal', True:'ste{}'}
root_path = '/Users/filippomoro/Documents/Training_with_memristors/Models'
save_checkpoint_path = root_path + '/{}_{}_epochs{}.pt'.format( task, model_type[noise_inference].format(noise_sd_ste), epochs+epochs_load ) # epochs_load+epochs
if epochs_load == 0:
    load_checkpoint_path = None
else:
    load_checkpoint_path = root_path + '/{}_{}_epochs{}.pt'.format( task, model_type[False].format(noise_sd_ste), epochs_load )

model_trained, [accs_train, losses_train], [acc_test, loss_test] = training_algo( training_type='normal', model=model, data_loaders=data_loaders,
                                                                                criterion=criterion, optimizer=optimizer, scheduler=scheduler,
                                                                                clip_w=None, lr=lr, epochs=epochs, epochs_noise=2, 
                                                                                print_every=1, verbose=True, device=device,
                                                                                save_checkpoint_path=save_checkpoint_path,
                                                                                load_checkpoint_path=load_checkpoint_path )

Train Epoch 1, Train accuracy 46.77% Train loss 1.4450
Checkpoint saved at: /Users/filippomoro/Documents/Training_with_memristors/Models/cifar10_normal_epochs2.pt
Tot epochs 2 -- Test accuracy 49.73% Test loss 1.3693


#### ----> Testing with Noise

In [5]:
n_models = 1
noise_sd_list = np.array([0, 0.01, 0.05, 0.1, 0.15, 0.2]) #np.array([0, 0.01, 0.02, 0.04, 0.05, 0.07, 0.1, 0.15])
# noise_sd_list = np.array([0]) 
acc_test_c10_normal = np.zeros( (n_models, len(noise_sd_list)) )

### model path
#path_model = f'/Users/filippomoro/Documents/Training_with_memristors/Models/{task}_normal_virgin.pt'
#path_model = f'/Users/filippomoro/Documents/Training_with_memristors/Models/{task}_ste_{0.05}_adam.pt'
root_path = '/Users/filippomoro/Documents/Training_with_memristors/Models'
model_type = {False:'normal', True:'ste{}'}
save_checkpoint_path = None #root_path + '/{}_{}_epochs{}.pt'.format( task, model_type[noise_inference], epochs_load+epochs )
load_checkpoint_path = root_path + '/{}_{}_epochs{}.pt'.format( task, model_type[True].format(0.1), 10 )

with torch.no_grad():
    for n, noise_sd in enumerate(noise_sd_list):
        for m in range( n_models ):
            model = get_model( weights = 'cifar_specs', out_features=num_classes, noise_inference=True, noise_sd=noise_sd )
            checkpoint = torch.load( load_checkpoint_path, map_location='cpu' )
            model.load_state_dict(checkpoint['model_state_dict'])
            model = model.to(device)
            model.eval()

            acc_test_noise, loss_test_noise = testing( model=model, test_loader=test_loader, device=device, verbose=False )
            acc_test_c10_normal[m, n] = acc_test_noise
        print( f'Avg accuracy with noise on params {noise_sd*100}% is: {acc_test_c10_normal[:,n].mean()*100:.2f}%' )

Avg accuracy with noise on params 0.0% is: 88.28%
Avg accuracy with noise on params 1.0% is: 88.19%
Avg accuracy with noise on params 5.0% is: 87.61%
Avg accuracy with noise on params 10.0% is: 84.98%
Avg accuracy with noise on params 15.0% is: 75.63%
Avg accuracy with noise on params 20.0% is: 52.63%


### CIFAR 100
---

In [6]:
epochs = 1
epochs_load = 100
lr = 1e-2

noise_inference, noise_sd_ste = False, 0.1
data_loaders = [train_loader, test_loader] # [trainloader, testloader]
#model = get_model(weights='IMAGENET1K_V2', noise_inference=noise_inference, noise_sd=noise_sd_ste, out_features=num_classes)
model = get_model( weights = 'cifar_specs', out_features=num_classes, noise_inference=noise_inference, noise_inference_bn=noise_inference, noise_sd=noise_sd_ste )
criterion = torch.nn.NLLLoss() #torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD( model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4 )
scheduler = torch.optim.lr_scheduler.MultiStepLR( optimizer, milestones=[100, 150, 200], gamma=0.5 )

model_type = {False:'normal', True:'ste{}'}
root_path = '/Users/filippomoro/Documents/Training_with_memristors/Models'
save_checkpoint_path = None #root_path + '/{}_{}_epochs{}.pt'.format( task, model_type[noise_inference].format(noise_sd_ste), epochs+epochs_load ) # epochs_load+epochs
load_checkpoint_path = root_path + '/{}_{}_epochs{}.pt'.format( task, model_type[False].format(noise_sd_ste), epochs_load )

model_trained, [accs_train, losses_train], [acc_test, loss_test] = training_algo( training_type='normal', model=model, data_loaders=data_loaders,
                                                                                criterion=criterion, optimizer=optimizer,
                                                                                clip_w=None, lr=lr, epochs=epochs, epochs_noise=2, 
                                                                                print_every=1, verbose=True, device=device,
                                                                                save_checkpoint_path=save_checkpoint_path,
                                                                                load_checkpoint_path=load_checkpoint_path )

Train Epoch 1, Train accuracy 81.99% Train loss 0.5888
Tot epochs 101 -- Test accuracy 64.92% Test loss 1.2925


In [7]:
n_models = 1
noise_sd_list = np.array([0, 0.01, 0.05, 0.1, 0.15, 0.2]) #np.array([0, 0.01, 0.02, 0.04, 0.05, 0.07, 0.1, 0.15])
# noise_sd_list = np.array([0]) 
acc_test_c10_normal = np.zeros( (n_models, len(noise_sd_list)) )

### model path
#path_model = f'/Users/filippomoro/Documents/Training_with_memristors/Models/{task}_normal_virgin.pt'
#path_model = f'/Users/filippomoro/Documents/Training_with_memristors/Models/{task}_ste_{0.05}_adam.pt'
root_path = '/Users/filippomoro/Documents/Training_with_memristors/Models'
model_type = {False:'normal', True:'ste{}'}
save_checkpoint_path = None #root_path + '/{}_{}_epochs{}.pt'.format( task, model_type[noise_inference], epochs_load+epochs )
load_checkpoint_path = root_path + '/{}_{}_epochs{}.pt'.format( task, model_type[True].format(0.1), 110 )

with torch.no_grad():
    for n, noise_sd in enumerate(noise_sd_list):
        for m in range( n_models ):
            model = get_model( weights = 'cifar_specs', out_features=num_classes, noise_inference=True, noise_sd=noise_sd )
            checkpoint = torch.load( load_checkpoint_path, map_location='cpu' )
            model.load_state_dict(checkpoint['model_state_dict'])
            model = model.to(device)
            model.eval()

            acc_test_noise, loss_test_noise = testing( model=model, test_loader=test_loader, device=device, verbose=False )
            acc_test_c10_normal[m, n] = acc_test_noise
        print( f'Avg accuracy with noise on params {noise_sd*100}% is: {acc_test_c10_normal[:,n].mean()*100:.2f}%' )

Avg accuracy with noise on params 0.0% is: 64.02%
Avg accuracy with noise on params 1.0% is: 63.96%
Avg accuracy with noise on params 5.0% is: 62.34%
Avg accuracy with noise on params 10.0% is: 56.07%
Avg accuracy with noise on params 15.0% is: 40.90%
Avg accuracy with noise on params 20.0% is: 14.99%
