In [78]:
import numpy as np

class InputNeuron(object):
    
    def __init__(self, spike_train):
        self.spike_train = spike_train
        self.output_spikes_times = []
        self.global_time = 0
        
    def set_spike_train(self, spike_train):
        self.spike_train = spike_train
        
    def step(self):
        self.global_time += 1
        if self.spike_train[self.global_time] == 1:
            self.output_spikes_times.append(self.global_time)
    
    def get_spikes(self):
        return self.output_spikes_times


class Neuron(object):
    
    def __init__(self):
        self.input_spikes = [] # pairs time, intensity
        self.output_spikes_times = []
        self.global_time = 0
        self.potential = 0
        self.threshold = 1
        self.tau_m = 4
        self.tau_s = 2
        self.tau_r = 20
        self.time_scale = 0.1 # time unit is 100 ms
        self.history = [0]
    
    def receive_spike(self, intensity):
        self.input_spikes.append((self.global_time, intensity))
    
    def step(self):
        self.global_time += 1

        self.potential = 0
        self.spike = False
        
        for spike_time, intensity in self.input_spikes:
            self.potential += self.eps(self.global_time - spike_time) * intensity
        
        for spike_time in self.output_spikes_times:
            self.potential += self.nu(self.global_time - spike_time)
        
        if self.potential > self.threshold:
            self.output_spikes_times.append(self.global_time)
            self.spike = True
        
        self.history.append(self.potential)
    
    def eps(self, time):
        s = time * self.time_scale # - delay
        return (np.exp(-np.abs(s)/self.tau_m) - np.exp(-np.abs(s)/self.tau_s)) * (s > 0)
    
    def grad_eps(self, time):
        s = time * self.time_scale
        
        return (np.exp(-np.abs(s)/self.tau_m) * (-1/self.tau_m) - np.exp(-np.abs(s)/self.tau_s) * (-1/self.tau_s)) * (s > 0)
    
    def nu(self, time):
        s = time * self.time_scale
        return -self.threshold * np.exp(-s/self.tau_r) * (s > 0)
    
    def get_spikes(self):
        return self.output_spikes_times


class Connection(object):

    def __init__(self, input_neuron, output_neuron,
                 weights=[1], delays=[1]): # weights and delays are scaled
        self.weights = weights
        self.delays = delays
        self.input_neuron = input_neuron
        self.output_neuron = output_neuron
        self.global_time = 0
    
    def step(self):
        self.global_time += 1
        spikes = self.input_neuron.get_spikes()
        for spike_time in spikes:
            for weight, delay in zip(self.weights, self.delays):
                if spike_time + delay == self.global_time:
                    self.output_neuron.receive_spike(weight)
    
    def gradient(self, expected_spike_time):
        output_spikes = np.array(self.output_neuron.get_spikes())
        input_spikes = np.array(self.input_neuron.get_spikes())
        assert len(output_spikes) > 0
        output_spike_time = output_spikes[0]
        eps = self.output_neuron.eps
        grad_eps = self.output_neuron.grad_eps
        grad_numerator = 0
        grad_denumerator = 0
        for i in range(len(input_spikes)):
            grad_numerator += eps(output_spike_time - input_spikes[i] - self.delays)
            grad_denumerator += self.weights * grad_eps(output_spike_time - input_spikes[i] - self.delays) # sum over all delays and input axons for output_neuron
        grad = -grad_numerator / np.maximum(0.1, grad_denumerator) * (output_spike_time - expected_spike_time) # leaning rate will be in a learning algorithm
        return grad



In [36]:
from functools import reduce
class InputLayer(object):
    def __init__(self, shape):
        self.shape = shape 
        self.neur_size = reduce(lambda res, x: res*x, self.shape, 1)
        self.neurons = np.ndarray(shape=self.shape, dtype=InputNeuron, buffer=np.array([InputNeuron([])]*self.neur_size))
            
    def new_input(self, arg):
        for i, f in enumerate(arg):
            for j, l in enumerate(f):
                for k, m in enumerate(l):
                    self.neurons[i][j][k].set_spike_train(arg[i][j][k])
    
    def step(self):
        for neur in self.neurons.reshape((self.neur_size)):
            neur.step()

In [None]:
class Conv2DLayer(object):
    def __init__(self, input_layer, num_filters, filter_shape, weights):
        self.shape = (num_filters, input_layer.shape[1]-filter_shape[0]+1, input_layer.shape[2]-filter_shape[1]+1)
        self.neur_size = reduce(lambda res, x: res*x, self.shape, 1)
        self.neurons = np.ndarray(shape=self.shape, dtype=Neuron, buffer=np.array([Neuron()]*self.neur_size))
        
        self.connections = []
        
        for nk, kernel in enumerate(self.neurons):
            for i, row in enumerate(kernel):
                for j, neuron in enumerate(row):   # соединяем с предыдущим слоем
                    self.connections += [Connection(input_layer.neurons[l][i+p][j+q],neuron, [weights[nk][p][q]])\
                                         for l in np.arange(input_layer.shape[0]) for p in np.arange(filter_shape[0])\
                                         for q in np.arange(filter_shape[1])] 
        
    def step(self):
        for conn in self.connections:
            conn.step()
        for neur in self.neurons.reshape((self.neur_size)):
            neur.step()

In [68]:
class SubSampling2DLayer(object):
    def __init__(self, input_layer, pool_size):
        self.shape = input_layer.shape // np.append([1], pool_size)
        self.neur_size = reduce(lambda res, x: res*x, self.shape, 1)
        self.neurons = np.ndarray(shape=self.shape, dtype=Neuron, buffer=np.array([Neuron()]*self.neur_size))
        
        self.conn_weight = 1 / (pool_size[0] * pool_size[1])
        
        self.connections = []
        
        for nk, kernel in enumerate(self.neurons):
            for i, row in enumerate(kernel):
                for j, neuron in enumerate(row):   
                    self.connections += [Connection(input_layer.neurons[l][i*pool_size[0]+p][j*pool_size[1]+q],neuron,\
                                                    [self.conn_weight])\
                                         for l in np.arange(input_layer.shape[0]) for p in np.arange(pool_size[0])\
                                         for q in np.arange(pool_size[1])] 
        
    def step(self):
        for conn in self.connections:
            conn.step()
        for neur in self.neurons.reshape((self.neur_size)):
            neur.step()

In [72]:
class DenseLayer(object):
    def __init__(self, input_layer, num_units, weights):
        self.shape = [num_units]
        self.neur_size = num_units
        self.neurons = np.array([Neuron()]*self.neur_size)
        
        self.connections = [Connection(input_neuron, output_neuron, [weights[i][j]])\
                            for i, input_neuron in enumerate(input_layer.neurons.reshape((input_layer.neur_size)))\
                            for j, output_neuron in enumerate(self.neurons)]
    def step(self):
        for conn in self.connections:
            conn.step()
        for neur in self.neurons:
            neur.step()
    

In [76]:
class NNet(object):
    def __init__(self, shape):
        self.layers = [InputLayer(shape)]
    
    def add_convolution(self, num_filters, filter_shape, weights):
        self.layers.append(Conv2DLayer(self.layers[-1], num_filters, filter_shape, weights))
        
    def add_subsampling(self, pool_size):
        self.layers.append(SubSampling2DLayer(self.layers[-1], pool_size))
        
    def add_dense(self, num_units, weights):
        self.layers.append(DenseLayer(self.layers[-1], num_units, weights))
    
    def get_output_for(self, data, t_max):
        self.layers[0].new_input(data)
        for t in np.arange(t_max):
            for layer in self.layers:
                layer.step()
        result = [neur.get_spikes() for neur in self.layers[-1].neurons]
        return result

In [None]:
nn = NNet((1, 4, 4))
nn.add_convolution(2, (3, 3), [[1, 1, 1], [2, 2, 2], [3, 3, 3]])
nn.add_subsampling((2, 2))
nn.add_dense(1, [[1], [1]])
nn.get_output_for(inp, 8)

In [65]:
train = [1, 1, 1, 1, 0, 1, 1, 0, 1]
inp = np.array([train]*16).reshape(1, 4, 4, 9)

In [66]:
inp

array([[[[1, 1, 1, 1, 0, 1, 1, 0, 1],
         [1, 1, 1, 1, 0, 1, 1, 0, 1],
         [1, 1, 1, 1, 0, 1, 1, 0, 1],
         [1, 1, 1, 1, 0, 1, 1, 0, 1]],

        [[1, 1, 1, 1, 0, 1, 1, 0, 1],
         [1, 1, 1, 1, 0, 1, 1, 0, 1],
         [1, 1, 1, 1, 0, 1, 1, 0, 1],
         [1, 1, 1, 1, 0, 1, 1, 0, 1]],

        [[1, 1, 1, 1, 0, 1, 1, 0, 1],
         [1, 1, 1, 1, 0, 1, 1, 0, 1],
         [1, 1, 1, 1, 0, 1, 1, 0, 1],
         [1, 1, 1, 1, 0, 1, 1, 0, 1]],

        [[1, 1, 1, 1, 0, 1, 1, 0, 1],
         [1, 1, 1, 1, 0, 1, 1, 0, 1],
         [1, 1, 1, 1, 0, 1, 1, 0, 1],
         [1, 1, 1, 1, 0, 1, 1, 0, 1]]]])