In [5]:
import numpy as np

np.random.seed(0)

In [1]:
class SequenceLearner:

    def __init__(self, n_sensory, n_neurons, top_k, p, scaffold=False):
        self.epsilon = 1e-8
        self.ns = n_sensory
        self.nn = n_neurons
        self.top_k = top_k
        self.p = p
        self.scaffold = scaffold
        if (self.scaffold):
            self.setup_scaffold()
        else:
            self.setup_simple()
        self.renormalize()
        
    def setup_simple(self):
        # setup a simple brain
        self.n_areas = 1
        self.brain_size = self.ns + self.nn * self.n_areas
        self.W = np.zeros((self.brain_size, self.brain_size))
        # sensory -> output (area 1)    [afferent]
        self.W[:self.ns, self.ns:] = np.random.choice([0.0, 1.0], size=(self.ns, self.nn), p=[1 - self.p, self.p])
        # output (area 1)   [recurrent]
        self.W[self.ns:, self.ns:] = np.random.choice([0.0, 1.0], size=(self.nn, self.nn), p=[1 - self.p, self.p])
        # remove any self connections
        np.fill_diagonal(self.W, 0.0)
        self.inputs = np.zeros(self.brain_size - self.ns)

    def setup_scaffold(self):
        # setup a scaffold brain
        self.n_areas = 2
        self.brain_size = self.ns + self.nn * self.n_areas
        self.W = np.zeros((self.brain_size, self.brain_size))
        # sensory -> output (area 1)    [afferent]
        self.W[:self.ns, self.ns:self.ns + self.nn] = np.random.choice([0.0, 1.0], size=(self.ns, self.nn), p=[1 - self.p, self.p])
        # output -> scaffold (area 2)   [afferent]
        self.W[self.ns:self.ns + self.nn, self.ns + self.nn:] = np.random.choice([0.0, 1.0], size=(self.nn, self.nn), p=[1 - self.p, self.p])
        # scaffold (area 2) -> output   [afferent]
        self.W[self.ns + self.nn:, self.ns:self.ns + self.nn] = np.random.choice([0.0, 1.0], size=(self.nn, self.nn), p=[1 - self.p, self.p])
        # output (area 1)   [recurrent]
        self.W[self.ns:self.ns + self.nn, self.ns:self.ns + self.nn] = np.random.choice([0.0, 1.0], size=(self.nn, self.nn), p=[1 - self.p, self.p])
        # scaffold (area 2)   [recurrent]
        self.W[self.ns + self.nn:, self.ns + self.nn:] = np.random.choice([0.0, 1.0], size=(self.nn, self.nn), p=[1 - self.p, self.p])
        # remove any self connections
        np.fill_diagonal(self.W, 0.0)   
        self.inputs = np.zeros(self.brain_size - self.ns)

    def top_k_indices(self, array, offset=0):
        top_indices = [ ]
        valid_indices = np.where(array > self.epsilon)[0]
        if valid_indices.size > 0:
            k_small = min(self.top_k, valid_indices.size)
            top_indices = valid_indices[np.argpartition(-array[valid_indices], k_small - 1)[:k_small]]
            return top_indices + offset
        return [ ]
    
    def top_k_indices_per_area(self, inputs):
        c_idx = [ ]
        for a in range(self.n_areas):
            ridx = a * self.nn
            c_idx.extend(self.top_k_indices(inputs[ridx:ridx + self.nn], offset=ridx + self.ns))
        return c_idx

    def renormalize(self):
        col_sums = self.W.sum(axis=0)
        col_sums[col_sums < self.epsilon] = 1
        self.W = self.W / col_sums
    
    def observe_sequence(self, sequence, beta, update=True, pc=False):
        predictions = [ ]
        p_idx, c_idx = [ ], [ ]
        for t in range(len(sequence)):
            # transfer synaptic input
            self.inputs.fill(0)
            c_idx.extend(sequence[t])
            self.inputs += self.W[np.array(c_idx, dtype=np.int32), self.ns:].sum(axis=0)
            # compute activations (top_k)
            p_idx = c_idx
            c_idx = self.top_k_indices_per_area(self.inputs)
            c_out = [ c - self.ns for c in c_idx if self.ns <= c < self.ns + self.nn ]
            predictions.append(c_out)
            # plasticity
            if (update):
                for p in p_idx:
                    for c in c_idx:
                        # predictive coding rule (reinforce self.assemblies)
                        sign = ((-1 if ((c - self.ns) not in self.assemblies[t]) else 1) if (self.ns <= c < self.ns + self.nn) else 1) if pc else 1
                        self.W[p, c] *= (1 + sign * beta)
        return predictions
    
    def hamming_similarity(self, a, b):
        unmatch = len(set(a).symmetric_difference(set(b)))
        return  1.0 - (unmatch / (2 * self.top_k))
    
    def get_assemblies(self, sequence):
        assemblies = self.observe_sequence(sequence, beta=0, update=False)
        return assemblies
    
    def train(self, sequence, rounds, beta, assemblies, pc=False):
        self.assemblies = assemblies
        for round in range(rounds):
            # print(f'Round {round}:', end=' ')
            predictions = self.observe_sequence(sequence, beta, update=True, pc=pc)
            self.renormalize()  # renormalize brain after each training

    def test(self, sequence, assemblies):
        erased_sequence = [ sequence[t] if t == 0 else [] for t in range(len(sequence)) ]
        predictions = self.observe_sequence(erased_sequence, 0, update=False)
        metric = [ self.hamming_similarity(predictions[i], assemblies[i]) for i in range(len(assemblies)) ]
        # print(metric)
        return metric
        

### Experiments

In [2]:
def generate_sequence(n_sensory, length, top_k):
    sequence = [ ]
    for timestep in range(length):
        sequence.append(list(np.random.choice(n_sensory, top_k, replace=False)))
    return sequence

In [4]:
initial_sequence = generate_sequence(n_sensory=2048, length=8, top_k=32)
perturbation_step = 4
perturbed_sequence = initial_sequence[:perturbation_step]
perturbed_sequence.extend(generate_sequence(n_sensory=2048, length=8-perturbation_step, top_k=32))

In [5]:
learner = SequenceLearner(n_sensory=2048, n_neurons=2048, top_k=32, p=0.2, scaffold=False)

initial_assemblies = learner.get_assemblies(initial_sequence)
perturbed_assemblies = learner.get_assemblies(perturbed_sequence)
learner.train(sequence=initial_sequence, rounds=10, beta=0.10, assemblies=initial_assemblies, pc=False)
learner.train(sequence=perturbed_sequence, rounds=20, beta=0.10, assemblies=perturbed_assemblies, pc=False)
score = learner.test(sequence=perturbed_sequence, assemblies=perturbed_assemblies)

[1.0, 1.0, 1.0, 1.0, 0.21875, 0.03125, 0.0, 0.0]


In [6]:
learner = SequenceLearner(n_sensory=2048, n_neurons=2048, top_k=32, p=0.2, scaffold=False)

initial_assemblies = learner.get_assemblies(initial_sequence)
perturbed_assemblies = learner.get_assemblies(perturbed_sequence)
learner.train(sequence=initial_sequence, rounds=10, beta=0.10, assemblies=initial_assemblies, pc=True)
learner.train(sequence=perturbed_sequence, rounds=20, beta=0.10, assemblies=perturbed_assemblies, pc=True)
score = learner.test(sequence=perturbed_sequence, assemblies=perturbed_assemblies)

[1.0, 1.0, 1.0, 1.0, 0.90625, 0.875, 0.71875, 0.65625]


In [7]:
learner = SequenceLearner(n_sensory=2048, n_neurons=2048, top_k=32, p=0.2, scaffold=True)

initial_assemblies = learner.get_assemblies(initial_sequence)
perturbed_assemblies = learner.get_assemblies(perturbed_sequence)
learner.train(sequence=initial_sequence, rounds=10, beta=0.10, assemblies=initial_assemblies, pc=False)
learner.train(sequence=perturbed_sequence, rounds=20, beta=0.10, assemblies=perturbed_assemblies, pc=False)
score = learner.test(sequence=perturbed_sequence, assemblies=perturbed_assemblies)

[1.0, 1.0, 1.0, 1.0, 0.28125, 0.09375, 0.03125, 0.0]


In [8]:
learner = SequenceLearner(n_sensory=2048, n_neurons=2048, top_k=32, p=0.2, scaffold=True)

initial_assemblies = learner.get_assemblies(initial_sequence)
perturbed_assemblies = learner.get_assemblies(perturbed_sequence)
learner.train(sequence=initial_sequence, rounds=10, beta=0.10, assemblies=initial_assemblies, pc=True)
learner.train(sequence=perturbed_sequence, rounds=20, beta=0.10, assemblies=perturbed_assemblies, pc=True)
score = learner.test(sequence=perturbed_sequence, assemblies=perturbed_assemblies)

[1.0, 1.0, 1.0, 1.0, 0.96875, 0.9375, 0.5625, 0.28125]


In [6]:
def print_csv(array):
    for e in array:
        print(e, end=', ')
    print()

In [7]:
for iteration in range(5):
    initial_sequence = generate_sequence(n_sensory=2048, length=8, top_k=32)
    perturbation_step = 4
    perturbed_sequence = initial_sequence[:perturbation_step]
    perturbed_sequence.extend(generate_sequence(n_sensory=2048, length=8-perturbation_step, top_k=32))

    learner = SequenceLearner(n_sensory=2048, n_neurons=2048, top_k=32, p=0.2, scaffold=False)
    initial_assemblies = learner.get_assemblies(initial_sequence)
    perturbed_assemblies = learner.get_assemblies(perturbed_sequence)
    learner.train(sequence=initial_sequence, rounds=10, beta=0.10, assemblies=initial_assemblies, pc=False)
    learner.train(sequence=perturbed_sequence, rounds=20, beta=0.10, assemblies=perturbed_assemblies, pc=False)
    score = learner.test(sequence=perturbed_sequence, assemblies=perturbed_assemblies)
    print_csv(score)

    learner = SequenceLearner(n_sensory=2048, n_neurons=2048, top_k=32, p=0.2, scaffold=False)
    initial_assemblies = learner.get_assemblies(initial_sequence)
    perturbed_assemblies = learner.get_assemblies(perturbed_sequence)
    learner.train(sequence=initial_sequence, rounds=10, beta=0.10, assemblies=initial_assemblies, pc=True)
    learner.train(sequence=perturbed_sequence, rounds=20, beta=0.10, assemblies=perturbed_assemblies, pc=True)
    score = learner.test(sequence=perturbed_sequence, assemblies=perturbed_assemblies)
    print_csv(score)

    learner = SequenceLearner(n_sensory=2048, n_neurons=2048, top_k=32, p=0.2, scaffold=True)
    initial_assemblies = learner.get_assemblies(initial_sequence)
    perturbed_assemblies = learner.get_assemblies(perturbed_sequence)
    learner.train(sequence=initial_sequence, rounds=10, beta=0.10, assemblies=initial_assemblies, pc=False)
    learner.train(sequence=perturbed_sequence, rounds=20, beta=0.10, assemblies=perturbed_assemblies, pc=False)
    score = learner.test(sequence=perturbed_sequence, assemblies=perturbed_assemblies)
    print_csv(score)

    learner = SequenceLearner(n_sensory=2048, n_neurons=2048, top_k=32, p=0.2, scaffold=True)
    initial_assemblies = learner.get_assemblies(initial_sequence)
    perturbed_assemblies = learner.get_assemblies(perturbed_sequence)
    learner.train(sequence=initial_sequence, rounds=10, beta=0.10, assemblies=initial_assemblies, pc=True)
    learner.train(sequence=perturbed_sequence, rounds=20, beta=0.10, assemblies=perturbed_assemblies, pc=True)
    score = learner.test(sequence=perturbed_sequence, assemblies=perturbed_assemblies)
    print_csv(score)

1.0, 1.0, 1.0, 1.0, 0.21875, 0.03125, 0.0, 0.0, 
1.0, 1.0, 1.0, 1.0, 0.90625, 0.875, 0.71875, 0.65625, 
1.0, 1.0, 1.0, 1.0, 0.28125, 0.09375, 0.03125, 0.0, 
1.0, 1.0, 1.0, 1.0, 0.96875, 0.9375, 0.5625, 0.28125, 
1.0, 1.0, 1.0, 1.0, 0.28125, 0.03125, 0.03125, 0.0625, 
1.0, 1.0, 1.0, 1.0, 0.9375, 0.9375, 0.875, 0.78125, 
1.0, 1.0, 1.0, 1.0, 0.3125, 0.1875, 0.03125, 0.03125, 
1.0, 1.0, 1.0, 1.0, 0.875, 0.90625, 0.8125, 0.4375, 
1.0, 1.0, 1.0, 1.0, 0.375, 0.09375, 0.09375, 0.0, 
1.0, 1.0, 1.0, 1.0, 0.78125, 0.5, 0.4375, 0.5, 
1.0, 0.96875, 0.84375, 0.5625, 0.03125, 0.0, 0.03125, 0.0, 
1.0, 1.0, 1.0, 1.0, 0.9375, 0.84375, 0.4375, 0.375, 
1.0, 1.0, 1.0, 0.96875, 0.125, 0.03125, 0.03125, 0.03125, 
1.0, 1.0, 1.0, 1.0, 0.96875, 0.90625, 0.78125, 0.4375, 
1.0, 1.0, 1.0, 1.0, 0.28125, 0.28125, 0.125, 0.0, 
1.0, 1.0, 1.0, 1.0, 0.96875, 0.90625, 0.53125, 0.28125, 
1.0, 1.0, 1.0, 1.0, 0.1875, 0.0, 0.0, 0.0, 
1.0, 1.0, 1.0, 1.0, 0.96875, 0.90625, 0.71875, 0.625, 
1.0, 1.0, 1.0, 1.0, 0.28125, 0.21875,