# (h) Langevin dynamics
### Initial learning digits

In [1]:
import os
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   
os.environ["CUDA_VISIBLE_DEVICES"]="0"

import torch
from torch import Tensor
import torch.nn as nn
import torch.cuda as cuda
import torch.utils.data as data_utils
from torch.utils.data import DataLoader, Dataset
from torch.nn.parameter import Parameter

try:
    import accimage
except ImportError:
    accimage = None

import random
import copy
import time

import torchvision
from torchvision import datasets
from torchvision import transforms
from torchvision.datasets import ImageFolder

from SpykeTorch import snn
from SpykeTorch import functional as sf
from SpykeTorch import visualization as vis
from SpykeTorch import utils

import struct
import glob
import datetime
import re

from sklearn.datasets import fetch_openml
from sklearn.metrics import confusion_matrix

import pandas as pd
import numpy as np
import csv
import seaborn as sn
import matplotlib.pyplot as plt
%matplotlib inline

import warnings
warnings.filterwarnings("ignore")

from PIL import Image

#import import_ipynb
#from MozafariMNIST2018_class import MozafariMNIST2018

## Model

In [2]:
#learning rule

class STDP(nn.Module):

    def __init__(self, conv_layer, learning_rate, epsilon = 1.0, 
                 use_stabilizer = True, lower_bound = 0, upper_bound = 1):
        
        super(STDP, self).__init__()
        self.conv_layer = conv_layer
        if isinstance(learning_rate, list):
            self.learning_rate = learning_rate
        else:
            self.learning_rate = [learning_rate] * conv_layer.out_channels
        for i in range(conv_layer.out_channels):
            self.learning_rate[i] = (Parameter(torch.tensor([self.learning_rate[i][0]])),
                            Parameter(torch.tensor([self.learning_rate[i][1]])))
            self.register_parameter('ltp_' + str(i), self.learning_rate[i][0])
            self.register_parameter('ltd_' + str(i), self.learning_rate[i][1])
            self.learning_rate[i][0].requires_grad_(False)
            self.learning_rate[i][1].requires_grad_(False)
        self.use_stabilizer = use_stabilizer
        self.lower_bound = lower_bound
        self.upper_bound = upper_bound
        self.epsilon = epsilon  # std of weights (for Brownian dynamics in weights)

    def get_pre_post_ordering(self, input_spikes, output_spikes, winners):
        # accumulating input and output spikes to get latencies
        input_latencies = torch.sum(input_spikes, dim=0)
        output_latencies = torch.sum(output_spikes, dim=0)
        result = []
        for winner in winners:
            # generating repeated output tensor with the same size of the receptive field
            out_tensor = torch.ones(*self.conv_layer.kernel_size, device=output_latencies.device) * output_latencies[winner]
            
            # slicing input tensor with the same size of the receptive field centered around winner
            # since there is no padding, there is no need to shift it to the center
            in_tensor = input_latencies[:,winner[-2]:winner[-2]+self.conv_layer.kernel_size[-2],winner[-1]:winner[-1]+self.conv_layer.kernel_size[-1]]
            result.append(torch.ge(in_tensor,out_tensor))
        return result

    # simple STDP rule with Brownian dynamics in weights
    # gets prepost pairings, winners, weights, and learning rates (all shoud be tensors)
    def forward(self, input_spikes, potentials, output_spikes, 
                winners=None, freeze_tensor=None, research_epoch=False, mean_value=0, 
                kwta = 1, inhibition_radius = 0):
        
        if winners is None:
            winners = sf.get_k_winners(potentials, kwta, inhibition_radius, output_spikes)
        pairings = self.get_pre_post_ordering(input_spikes, output_spikes, winners)

        lr = torch.zeros_like(self.conv_layer.weight)
        for i in range(len(winners)):
            f = winners[i][0]
            lr[f] = torch.where(pairings[i], *(self.learning_rate[f]))

        previous_conv_layer_weight = self.conv_layer.weight.clone().detach().flatten()
        
        delta_weight = lr * ((self.conv_layer.weight - self.lower_bound) * \
                                    (self.upper_bound - self.conv_layer.weight) \
                                    if self.use_stabilizer else 1)
        
        self.conv_layer.weight += delta_weight
        self.conv_layer.weight.clamp_(self.lower_bound, self.upper_bound)
        
        weights_before_noise = self.conv_layer.weight.detach().clone().flatten()
        mean_delta_weight = weights_before_noise - previous_conv_layer_weight    # really not 'mean' 
        
        number_unit_segment = 1111  # special number for non-research epochs
        
        if research_epoch:          # let's add Brownian dynamics to weights !!!!!
            
            number_of_features = 200
            len_unit_segment = self.conv_layer.weight.flatten().size()[0]/number_of_features
            nonzero_coord = torch.nonzero(delta_weight.flatten())[0].item()
            n_u_s = int(nonzero_coord)//len_unit_segment

            if isinstance(n_u_s, int) and (int(nonzero_coord)%len_unit_segment == 0):        
                number_unit_segment = n_u_s
            else:
                number_unit_segment = 1000 
            
            mean_value = 1          # ATTENTION: because calculation of noise use only self.epsilon
            self.conv_layer.weight += torch.normal(mean=torch.zeros_like(self.conv_layer.weight, device="cuda"), \
                                                   std=torch.ones_like(self.conv_layer.weight, device="cuda") * \
                                                   mean_value * self.epsilon)
            self.conv_layer.weight.clamp_(self.lower_bound, self.upper_bound)
            
        return weights_before_noise, mean_delta_weight, number_unit_segment
        
    def update_learning_rate(self, feature, ap, an):    

        self.learning_rate[feature][0][0] = ap
        self.learning_rate[feature][1][0] = an

    def update_all_learning_rate(self, ap, an):       
    
        for feature in range(self.conv_layer.out_channels):
            self.learning_rate[feature][0][0] = ap
            self.learning_rate[feature][1][0] = an

In [3]:
class MozafariMNIST2018(nn.Module):
    
    def __init__(self, epsilon=2.0, dropout=0.5, dropout_procedure = False):
        
        super(MozafariMNIST2018, self).__init__()

        self.conv1 = snn.Convolution(6, 30, 5, 0.8, 0.05)
        self.conv1_t = 15
        self.k1 = 5
        self.r1 = 3

        self.conv2 = snn.Convolution(30, 250, 3, 0.8, 0.05)
        self.conv2_t = 10
        self.k2 = 8
        self.r2 = 1

        self.conv3 = snn.Convolution(250, 200, 5, 0.8, 0.05)
        self.number_of_features = 200
        
        self.dropout = dropout
        self.epsilon = epsilon
        
        # ATTENTION: in STDP variable epsilon - additional small value

        self.stdp1 = STDP(self.conv1, (0.004, -0.003), self.epsilon)                        
        self.stdp2 = STDP(self.conv2, (0.004, -0.003), self.epsilon)                        
        self.stdp3 = STDP(self.conv3, (0.004, -0.003), self.epsilon, False, 0.2, 0.8)       
        self.anti_stdp3 = STDP(self.conv3, (-0.004, 0.0005), self.epsilon, False, 0.2, 0.8) 
        self.max_ap = Parameter(torch.Tensor([0.15]))

        self.decision_map = []
        for i in range(10):
            self.decision_map.extend([i]*20)

        self.ctx = {"input_spikes":None, "potentials":None, \
                    "output_spikes":None, "winners":None, \
                    "freeze_tensor":None}                       # freeze_tensor was added             
        self.spk_cnt1 = 0
        self.spk_cnt2 = 0
        
        self.dropout_procedure = dropout_procedure
        
        
    def forward(self, input, max_layer, mean_value=0, research_epoch=False, freeze_tensor=None):    
        
        input = sf.pad(input.float(), (2,2,2,2), 0)
        
        if self.training:
            pot = self.conv1(input)
            spk, pot = sf.fire(pot, self.conv1_t, True)
            if max_layer == 1:
                self.spk_cnt1 += 1
                if not research_epoch:
                    if self.spk_cnt1 >= 500:
                        self.spk_cnt1 = 0
                        ap = torch.tensor(self.stdp1.learning_rate[0][0].item(), device=self.stdp1.learning_rate[0][0].device) * 2
                        ap = torch.min(ap, self.max_ap)
                        an = ap * -0.75
                        self.stdp1.update_all_learning_rate(ap.item(), an.item())
                
                pot = sf.pointwise_inhibition(pot)
                spk = pot.sign()
                winners = sf.get_k_winners(pot, self.k1, self.r1, spk)
                self.ctx["input_spikes"] = input
                self.ctx["potentials"] = pot
                self.ctx["output_spikes"] = spk
                self.ctx["winners"] = winners
                return spk, pot
            
            spk_in = sf.pad(sf.pooling(spk, 2, 2), (1,1,1,1))
            pot = self.conv2(spk_in)
            spk, pot = sf.fire(pot, self.conv2_t, True)
            if max_layer == 2:
                self.spk_cnt2 += 1
                if not research_epoch:
                    if self.spk_cnt2 >= 500:
                        self.spk_cnt2 = 0
                        ap = torch.tensor(self.stdp2.learning_rate[0][0].item(), device=self.stdp2.learning_rate[0][0].device) * 2
                        ap = torch.min(ap, self.max_ap)
                        an = ap * -0.75
                        self.stdp2.update_all_learning_rate(ap.item(), an.item())

                pot = sf.pointwise_inhibition(pot)
                spk = pot.sign()
                winners = sf.get_k_winners(pot, self.k2, self.r2, spk)
                self.ctx["input_spikes"] = spk_in
                self.ctx["potentials"] = pot
                self.ctx["output_spikes"] = spk
                self.ctx["winners"] = winners
                return spk, pot
            
            spk_in = sf.pad(sf.pooling(spk, 3, 3), (2,2,2,2))
            pot = self.conv3(spk_in)           
            
            if self.dropout_procedure:
                dropout = torch.ones(self.number_of_features) * self.dropout
                to_be_dropped = torch.bernoulli(dropout).nonzero()   
                sf.feature_inhibition_(pot, to_be_dropped)
            
            spk = sf.fire(pot)
            winners = sf.get_k_winners(pot, 1, 0, spk)
            self.ctx["input_spikes"] = spk_in
            self.ctx["potentials"] = pot
            self.ctx["output_spikes"] = spk
            self.ctx["winners"] = winners
            self.ctx["freeze_tensor"] = freeze_tensor

            output = -1
            if len(winners) != 0:
                output = self.decision_map[winners[0][0]]
            return output
        
        else:
            pot = self.conv1(input)
            spk, pot = sf.fire(pot, self.conv1_t, True)
            if max_layer == 1:
                return spk, pot
            
            pot = self.conv2(sf.pad(sf.pooling(spk, 2, 2), (1,1,1,1)))
            spk, pot = sf.fire(pot, self.conv2_t, True)
            if max_layer == 2:
                return spk, pot
            pot = self.conv3(sf.pad(sf.pooling(spk, 3, 3), (2,2,2,2)))
            spk = sf.fire(pot)
            winners = sf.get_k_winners(pot, 1, 0, spk)
            output = -1
            if len(winners) != 0:
                output = self.decision_map[winners[0][0]]
            return output

    def stdp(self, layer_idx):
        if layer_idx == 1:
            self.stdp1(self.ctx["input_spikes"], self.ctx["potentials"], self.ctx["output_spikes"], \
                       self.ctx["winners"], self.ctx["freeze_tensor"], False, 0)
        if layer_idx == 2:
            self.stdp2(self.ctx["input_spikes"], self.ctx["potentials"], self.ctx["output_spikes"], \
                       self.ctx["winners"], self.ctx["freeze_tensor"], False, 0)

    def update_learning_rates(self, stdp_ap, stdp_an, anti_stdp_ap, anti_stdp_an):
               
        self.stdp3.update_all_learning_rate(stdp_ap, stdp_an)                   
        self.anti_stdp3.update_all_learning_rate(anti_stdp_an, anti_stdp_ap)    
 
    def reward(self, mean_value=0, research_epoch=False):
        weights_before_noise, mean_delta_weight, number_unit_segment = self.stdp3(self.ctx["input_spikes"], \
        self.ctx["potentials"], self.ctx["output_spikes"], self.ctx["winners"], self.ctx["freeze_tensor"], \
                                                             research_epoch, mean_value)
        return weights_before_noise, mean_delta_weight, number_unit_segment
    

    def punish(self, mean_value=0, research_epoch=False):
        weights_before_noise, mean_delta_weight, number_unit_segment = self.anti_stdp3(self.ctx["input_spikes"], \
        self.ctx["potentials"], self.ctx["output_spikes"], self.ctx["winners"], self.ctx["freeze_tensor"], \
                                                                  research_epoch, mean_value)
        return weights_before_noise, mean_delta_weight, number_unit_segment

In [4]:
# train and test

def train_unsupervise(network, data, layer_idx):
    network.train()
    
    for i in range(len(data)):
        data_in = data[i]
        if use_cuda:
            data_in = data_in.cuda()
        network(data_in, layer_idx)
        network.stdp(layer_idx)

def train_rl(network, data, target, mean_value=0, research_epoch=False, freeze_tensor=None):
    network.train()
    perf = np.array([0,0,0]) # correct, wrong, silence
    
    for i in range(len(data)):
        data_in = data[i]
        target_in = target[i]
        if use_cuda:
            data_in = data_in.cuda()
            target_in = target_in.cuda()
            
        # mean_value, research_epoch, freeze_tensor were added:
        d = network(data_in, 3, mean_value, research_epoch, freeze_tensor)    
        
        if d != -1:
            if d == target_in:
                perf[0]+=1                
                weights_before_noise, mean_delta_weight, reward_unit_segment = \
                            network.reward(mean_value, research_epoch)       
                punish_unit_segment = 0
            else:
                perf[1]+=1
                weights_before_noise, mean_delta_weight, punish_unit_segment = \
                            network.punish(mean_value, research_epoch) 
                reward_unit_segment = 0
        else:
            perf[2]+=1
    return perf/len(data), weights_before_noise, mean_delta_weight, reward_unit_segment, punish_unit_segment

def test(network, data, target):
    network.eval()
    perf = np.array([0,0,0]) # correct, wrong, silence
    for i in range(len(data)):
        data_in = data[i]
        target_in = target[i]
        if use_cuda:
            data_in = data_in.cuda()
            target_in = target_in.cuda()
        d = network(data_in, 3)
        if d != -1:
            if d == target_in:
                perf[0]+=1
            else:
                perf[1]+=1
        else:
            perf[2]+=1
    return perf/len(data)

In [5]:
# training the 3rd layer

def third_layer(file_name_net, file_name_csv, file_name_reward, file_name_punish,
                adaptive_int, epochs, first_research_epoch, 
                train_loader, test_loader, test_previous_loader,
                train_research_loader, model, parametr_set, epsilon):  
    
    '''
    file_name_net - name of file for saving state_dict of model
    file_name_csv - name of file for saving parameters of model in each epoch
    adaptive_int - learning rate parameter
    '''

    begin_time = time.time()
    
    adaptive_min=0 
    counter = 0
    
    apr = model.stdp3.learning_rate[0][0].item()
    anr = model.stdp3.learning_rate[0][1].item()
    app = model.anti_stdp3.learning_rate[0][1].item()
    anp = model.anti_stdp3.learning_rate[0][0].item()
    
    apr_adapt = ((1.0 - 1.0 / 10) * adaptive_int + adaptive_min) * apr
    anr_adapt = ((1.0 - 1.0 / 10) * adaptive_int + adaptive_min) * anr
    app_adapt = ((1.0 / 10) * adaptive_int + adaptive_min) * app
    anp_adapt = ((1.0 / 10) * adaptive_int + adaptive_min) * anp

    best_train = np.array([0.0,0.0,0.0,0.0]) # correct, wrong, silence, epoch
    best_test = np.array([0.0,0.0,0.0,0.0]) # correct, wrong, silence, epoch
    best_test_previous = np.array([0.0,0.0,0.0,0.0]) # correct, wrong, silence, epoch
    
    frequency_of_recording = 100
    
    reward_segments_list = []
    punish_segments_list = []

    for epoch in range(epochs):
        
        seconds_epoch_0 = time.time() 
        
        print('-'*50)
        print("Epoch #: ", epoch)
        perf_train = np.array([0.0,0.0,0.0])
        
        research_epoch = False
            
        if epoch >= first_research_epoch:
            research_epoch = True
            
        if not research_epoch:       

            for data,targets in train_loader:

                perf_train_batch, weights_before_noise, mean_delta_weight, _r, _p = train_rl(model, data, targets)

                #update adaptive learning rates
                apr_adapt = apr * (perf_train_batch[1] * adaptive_int + adaptive_min)
                anr_adapt = anr * (perf_train_batch[1] * adaptive_int + adaptive_min)
                app_adapt = app * (perf_train_batch[0] * adaptive_int + adaptive_min)
                anp_adapt = anp * (perf_train_batch[0] * adaptive_int + adaptive_min)

                parametr_set.loc[counter, 'epoch'] = epoch
                parametr_set.loc[counter, 'train'] = perf_train_batch[0]

                model.update_learning_rates(apr_adapt, anr_adapt, app_adapt, anp_adapt)
                perf_train += perf_train_batch

            perf_train /= len(train_loader)

            if best_train[0] <= perf_train[0]:
                best_train = np.append(perf_train, epoch)
            print(f"Current Train: {perf_train[0]*100 :.2f}%")

            for data,targets in test_loader:
                perf_test = test(model, data, targets)
                parametr_set.loc[counter, 'test'] = perf_test[0]
                if best_test[0] <= perf_test[0]:
                    best_test = np.append(perf_test, epoch)
                    torch.save(model.state_dict(), file_name_net)
                print(f"Current Test: {perf_test[0]*100 :.2f}%")

            if isinstance(test_previous_loader, DataLoader):
                for data,targets in test_previous_loader:
                    perf_test_previous = test(model, data, targets)
                    parametr_set.loc[counter, 'test_previous'] = perf_test_previous[0]
                    if best_test_previous[0] <= perf_test_previous[0]:
                        best_test_previous = np.append(perf_test_previous, epoch)
                    print(f"Current Test Previous: {perf_test_previous[0]*100 :.2f}%")
            else:
                parametr_set.loc[counter, 'test_previous'] = 0

            counter += 1

            seconds_epoch_1 = time.time()  
            print(f'Operational time of epoch #{epoch}: '
                      f'{int((seconds_epoch_1 - seconds_epoch_0)//60)} min {int((seconds_epoch_1 - seconds_epoch_0)%60)} sec') 

        else:
            print(f"*** it's research epoch #{epoch-first_research_epoch} ***")
            
            counter_of_research = 0
            
            unit = len(train_digit_research_loader)
            research_tensor = torch.ones((int(unit/frequency_of_recording), \
                                          model.conv3.weight.flatten().size()[0]), device=device)*0   
            
            # for mean_delta_weight quantity of training epochs must be > 0 
            # mean_value = torch.abs(mean_delta_weight).mean() 
            mean_value = 1
            print(f'in N(0, std): std = epsilon = {mean_value*epsilon}')

            for data, targets in train_research_loader:
                
                perf_train_batch, weights_before_noise, _, reward_unit_segment, punish_unit_segment = \
                    train_rl(model, data, targets, mean_value, research_epoch)
                
                reward_segments_list.append(reward_unit_segment)
                punish_segments_list.append(punish_unit_segment)
                
                if counter_of_research%frequency_of_recording == 0:
                    research_tensor[int(counter_of_research/frequency_of_recording)] = weights_before_noise

                if (counter_of_research+1)%unit == 0:
                    torch.save(research_tensor, f'set_of_weights_{epoch-first_research_epoch}.pt')

                counter_of_research += 1
                
            for data,targets in test_loader:
                
                perf_test = test(model, data, targets)
                parametr_set.loc[counter, 'train'] = 0
                parametr_set.loc[counter, 'test'] = perf_test[0]
                print(f"Current Test: {perf_test[0]*100 :.2f}%")
            
            counter += 1
            
            seconds_epoch_1 = time.time()  
            print(f'Operational time of epoch #{epoch}: '
                      f'{int((seconds_epoch_1 - seconds_epoch_0)//60)} min {int((seconds_epoch_1 - seconds_epoch_0)%60)} sec') 
            
            if perf_test[0] < 0.8:
                break
                
    parametr_set.to_csv(file_name_csv)
    
    with open(file_name_reward, 'w') as file:
        csv.writer(file, delimiter=';').writerow(reward_segments_list)
        
    with open(file_name_punish, 'w') as file:
        csv.writer(file, delimiter=';').writerow(punish_segments_list)

    end_time = time.time()  
    
    print('=='*10, 'SUMMARY', '=='*10)
    print(f'Total operational time: {(end_time - begin_time)//60} min')
    if best_train[0] > 0:
        print(f"Best Test: {best_test[0]*100 :.2f}% on {best_test[3] :.0f} epoch")
        
    return parametr_set, reward_segments_list, punish_segments_list

In [6]:
def curve_graph(parametr_set):

    plt.subplots(figsize=(15, 5))

    plt.plot(parametr_set['epoch'], parametr_set['train']*100, color='cyan', label='train')
    plt.plot(parametr_set['epoch'], parametr_set['test']*100, color='blue', marker = 'o', label='test')
    plt.plot(parametr_set['epoch'], parametr_set['test_previous']*100, linestyle = ':', color='red', label='test of previous images')
    plt.xlabel('epochs', loc='right', fontsize=17)
    plt.ylabel('accuracy, %',  loc='top', fontsize=17)
    plt.grid()
    plt.legend()
    plt.show()

In [7]:
# for image transformation (see dataset)

class S1C1Transform:
    
    def __init__(self, filter, PIL_type=False, timesteps = 15):
        self.PIL_type = PIL_type
        self.to_pil_image = transforms.ToPILImage()    
        self.to_tensor = transforms.ToTensor()
        self.filter = filter
        self.temporal_transform = utils.Intensity2Latency(timesteps)
        self.cnt = 0
        
    def __call__(self, image):
        if self.cnt % 10000 == 0:
            print(f'{self.cnt} images')
        if self.PIL_type:
            image = self.to_pil_image(image)
        self.cnt+=1
        image = self.to_tensor(image) * 255
        image.unsqueeze_(0)
        image = self.filter(image)
        image = sf.local_normalization(image, 8)
        temporal_image = self.temporal_transform(image)
        return temporal_image.sign().byte()

In [8]:
# for image transformation (see dataset)

kernels = [ utils.DoGKernel(3,3/9,6/9),
            utils.DoGKernel(3,6/9,3/9),
            utils.DoGKernel(7,7/9,14/9),
            utils.DoGKernel(7,14/9,7/9),
            utils.DoGKernel(13,13/9,26/9),
            utils.DoGKernel(13,26/9,13/9)]

filter = utils.Filter(kernels, padding = 6, thresholds = 50)

s1c1 = S1C1Transform(filter)
s1c1_PIL = S1C1Transform(filter, PIL_type=True)

In [9]:
# for image transformation (see dataset)

class CustomTensorDataset(Dataset):
    """TensorDataset with support of transforms."""
    
    def __init__(self, tensors, transform=None):
        assert all(tensors[0].size(0) == tensor.size(0) for tensor in tensors)
        self.tensors = tensors
        self.transform = transform

    def __getitem__(self, index):
        x = self.tensors[0][index]

        if self.transform:
            x = self.transform(x)

        y = self.tensors[1][index]

        return x, y

    def __len__(self):
        return self.tensors[0].size(0)

## Sets

### Set of 10 capital letters
24000 train images + 4000 test images

In [11]:
# set of 10 capital letters from EMNIST
path = f'./data/EMNIST_own/capital_letters/'

test_letter_labels = torch.load(f'{path}Mozafari_capital_letters_test_labels.pt', map_location=torch.device('cpu'))
test_letters = torch.load(f'{path}Mozafari_capital_letters_test_images.pt', map_location=torch.device('cpu'))

train_letter_labels = torch.load(f'{path}Mozafari_capital_letters_train_labels.pt', map_location=torch.device('cpu'))
train_letters = torch.load(f'{path}Mozafari_capital_letters_train_images.pt', map_location=torch.device('cpu'))

In [12]:
# Element permutation

train_order_l = torch.randperm(train_letter_labels.shape[0])
test_order_l = torch.randperm(test_letter_labels.shape[0])

train_letter_labels = train_letter_labels[train_order_l].view(train_letter_labels.size())
train_letters = train_letters[train_order_l].view(train_letters.size())

test_letter_labels = test_letter_labels[test_order_l].view(test_letter_labels.size())
test_letters = test_letters[test_order_l].view(test_letters.size())

In [13]:
train_letter_set = CustomTensorDataset(tensors=(train_letters, train_letter_labels), transform=s1c1_PIL)
test_letter_set = CustomTensorDataset(tensors=(test_letters, test_letter_labels), transform=s1c1_PIL)

train_letter_loader = DataLoader(train_letter_set, batch_size=len(train_letter_set))
test_letter_loader = DataLoader(test_letter_set, batch_size=len(test_letter_set))

In [14]:
train_letter_labels.size(), test_letter_labels.size()

(torch.Size([24000]), torch.Size([4000]))

### Set of 10 MNIST digits
Reduction 60000 train + 10000 test images to 24000 train + 4000 test images

In [15]:
classes = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [16]:
# the set of 10 digit images, the same size as the set of letters (2400 trains + 400 tests per class)

# the MNIST data was pre-divided into 10 classes
path = f'./data/MNIST_0_1_2_3_4_5_6_7_8_9/'

for i in classes: 
    globals()[f'train_digit_{i}_images'] = torch.load(f'{path}train_images_{i}.pt', map_location=torch.device('cpu'))[0:2400]
    globals()[f'train_digit_{i}_labels'] = torch.load(f'{path}train_labels_{i}.pt', map_location=torch.device('cpu'))[0:2400]
    globals()[f'test_digit_{i}_images'] = torch.load(f'{path}test_images_{i}.pt', map_location=torch.device('cpu'))[0:400]
    globals()[f'test_digit_{i}_labels'] = torch.load(f'{path}test_labels_{i}.pt', map_location=torch.device('cpu'))[0:400]

train_MNIST_labels = globals()[f'train_digit_0_labels']
train_MNIST_images = globals()[f'train_digit_0_images']
test_MNIST_labels = globals()[f'test_digit_0_labels']
test_MNIST_images = globals()[f'test_digit_0_images']                                 

for i in range(1, 10):
    train_MNIST_labels = torch.cat((train_MNIST_labels, globals()[f'train_digit_{i}_labels']), 0)
    train_MNIST_images = torch.cat((train_MNIST_images, globals()[f'train_digit_{i}_images']), 0)

    test_MNIST_labels = torch.cat((test_MNIST_labels, globals()[f'test_digit_{i}_labels']), 0)
    test_MNIST_images = torch.cat((test_MNIST_images, globals()[f'test_digit_{i}_images']), 0)

In [17]:
train_MNIST_labels.size(), test_MNIST_labels.size()

(torch.Size([24000]), torch.Size([4000]))

In [18]:
# Element permutation

train_order = torch.randperm(train_MNIST_labels.shape[0])
test_order = torch.randperm(test_MNIST_labels.shape[0])

train_MNIST_labels = train_MNIST_labels[train_order].view(train_MNIST_labels.size())
train_MNIST_images = train_MNIST_images[train_order].view(train_MNIST_images.size())

test_MNIST_labels = test_MNIST_labels[test_order].view(test_MNIST_labels.size())
test_MNIST_images = test_MNIST_images[test_order].view(test_MNIST_images.size())

train_MNIST_set = CustomTensorDataset(tensors=(train_MNIST_images, train_MNIST_labels), transform=s1c1_PIL)
test_MNIST_set = CustomTensorDataset(tensors=(test_MNIST_images, test_MNIST_labels), transform=s1c1_PIL)

train_MNIST_loader = DataLoader(train_MNIST_set, batch_size=len(train_MNIST_set))
test_MNIST_loader = DataLoader(test_MNIST_set, batch_size=len(test_MNIST_set))

### Research sets

In [19]:
# loaders for research purpose

train_MNIST_labels_r = train_MNIST_labels
train_MNIST_images_r = train_MNIST_images
test_MNIST_labels_r = test_MNIST_labels
test_MNIST_images_r = test_MNIST_images

train_digit_set_r = CustomTensorDataset(tensors=(train_MNIST_images_r, train_MNIST_labels_r), transform=s1c1_PIL)
test_digit_set_r = CustomTensorDataset(tensors=(test_MNIST_images_r, test_MNIST_labels_r), transform=s1c1_PIL)

train_digit_research_loader = DataLoader(train_digit_set_r, batch_size=1)
test_digit_research_loader = DataLoader(test_digit_set_r, batch_size=1)

## Model activation

In [20]:
# epsilon is standard deviation of weights (for Brownian dynamics in weights)
epsilon = 4e-4
mozafari = MozafariMNIST2018(epsilon=epsilon)

In [21]:
use_cuda = True

if torch.cuda.is_available():
    print("CUDA is available")
    device = torch.device("cuda")

CUDA is available


In [22]:
if use_cuda:
    mozafari.cuda()   

mozafari.eval()

MozafariMNIST2018(
  (conv1): Convolution()
  (conv2): Convolution()
  (conv3): Convolution()
  (stdp1): STDP(
    (conv_layer): Convolution()
  )
  (stdp2): STDP(
    (conv_layer): Convolution()
  )
  (stdp3): STDP(
    (conv_layer): Convolution()
  )
  (anti_stdp3): STDP(
    (conv_layer): Convolution()
  )
)

## Initial training

### Installation of SNN trained on 24,000 images of digits

In [23]:
# file "saved_24000_digits.net" is the result of the file "Initial_learning_of_SNN_on_digits.ipynb"
mozafari.load_state_dict(torch.load("saved_24000_digits.net"))

<All keys matched successfully>

In [24]:
parametr_set = pd.DataFrame(columns=['epoch', 'train', 'test', 'test_previous'])

### Training the third layer (with Brownian dynamics in weights)
#### only every 100th set of 3rd layer weights is recorded (240 sets per file)

In [25]:
first_test = third_layer(file_name_net="saved_digits_research.net",
                        file_name_csv='parameter_set_digits_research.csv',
                        file_name_reward='reward_segments_list.csv', 
                        file_name_punish='punish_segments_list.csv',  
                        adaptive_int=0.5, epochs=101, first_research_epoch=1,
                        train_loader=train_MNIST_loader, test_loader=test_MNIST_loader, test_previous_loader=[],
                        train_research_loader=train_digit_research_loader, 
                        model=mozafari, parametr_set=parametr_set, epsilon=epsilon)

--------------------------------------------------
Epoch #:  0
0 images
10000 images
20000 images
Current Train: 96.51%
Current Test: 93.65%
Operational time of epoch #0: 1 min 49 sec
--------------------------------------------------
Epoch #:  1
*** it's research epoch #0 ***
in N(0, std): std = epsilon = 0.0004
30000 images
40000 images
50000 images
Current Test: 93.55%
Operational time of epoch #1: 2 min 7 sec
--------------------------------------------------
Epoch #:  2
*** it's research epoch #1 ***
in N(0, std): std = epsilon = 0.0004
60000 images
70000 images
80000 images
Current Test: 92.73%
Operational time of epoch #2: 2 min 7 sec
--------------------------------------------------
Epoch #:  3
*** it's research epoch #2 ***
in N(0, std): std = epsilon = 0.0004
90000 images
100000 images
110000 images
Current Test: 92.70%
Operational time of epoch #3: 2 min 8 sec
--------------------------------------------------
Epoch #:  4
*** it's research epoch #3 ***
in N(0, std): std = e