In [3]:
from htm.algorithms import TemporalMemory
from htm_rl.modules.htm.temporal_memory import DelayedFeedbackTM
from htm_rl.common.sdr_encoders import IntBucketEncoder
from htm.bindings.sdr import SDR
import numpy as np
import matplotlib.pyplot as plt
from htm_rl.modules.htm.spatial_pooler import UnionTemporalPooler

In [4]:
def make_sdr(pos: tuple, _shape: tuple) -> SDR:
    result = SDR(_shape)
    numpy_res = result.dense
    numpy_res = np.zeros(_shape)
    numpy_res[pos] = 1
    result.dense = numpy_res
    return result

def make_sdrs(array: np.ndarray, _shape: tuple) -> np.ndarray:
    result = np.ndarray((array.size,), dtype=SDR)
    iterator = 0
    for number in array:
        result[iterator] = make_sdr(number, _shape)
        iterator += 1
    return result

def learn_model(tm: TemporalMemory, sdrs: np.ndarray, num_epochs=10) -> list:
    errors = []
    for epoch in range(num_epochs):
        for sdr in sdrs:
            tm.compute(sdr, learn=True)
            tm.activateDendrites(True)
            errors.append(tm.anomaly)
        tm.compute(SDR(sdrs[0].dense.shape), learn=False)
    return errors

def generate_data(n, n_actions, n_states, randomness=1.0, seed=0):
    raw_data = list()
    np.random.seed(seed)
    seed_seq = np.random.randint(0, n_actions, n_states)
    raw_data.append(seed_seq.copy())
    n_replace = int(n_states * randomness)
    for i in range(1, n):
        new_seq = np.random.randint(0, n_actions, n_states)
        if randomness == 1.0:
            raw_data.append(new_seq)
        else:
            indices = np.random.randint(0, n_states, n_replace)
            seed_seq[indices] = new_seq[indices]
            raw_data.append(seed_seq.copy())
    data = [list(zip(range(n_states), x)) for x in raw_data]
    return raw_data, data

def compute(tm, state, action, learn=True):
    context = state_encoder.encode(state)
    active_input = action_encoder.encode(action)

    tm.set_active_context_cells(context)

    tm.activate_basal_dendrites(learn)
    tm.predict_cells()

    tm.set_active_columns(active_input)
    tm.activate_cells(learn)
def run(tm, tp, policy, state_encoder, action_encoder,  learn=True):
    tp_input = SDR(tp.getNumInputs())
    tp_predictive = SDR(tp.getNumInputs())
    tp_prev_union = SDR(tp.getNumColumns())
    tp_errors = []

    for state, action in policy:
        context = state_encoder.encode(state)
        active_input = action_encoder.encode(action)

        tm.set_active_context_cells(context)

        tm.activate_basal_dendrites(learn)
        tm.predict_cells()

        tm.set_active_columns(active_input)
        tm.activate_cells(learn)

        tp_input.sparse = tm.get_active_cells()
        tp_predictive.sparse = tm.get_correctly_predicted_cells()
        tp.compute(tp_input, tp_predictive, learn)

        tp_prev_union.sparse = tp.getUnionSDR().sparse.copy()

        tm.set_active_feedback_cells(tp.getUnionSDR().sparse)
        tm.activate_apical_dendrites(learn)
        tm.propagate_feedback()



# run(tm, tp, data[0], state_encoder, action_encoder, True)

In [5]:
shape = (4,)
cells_per_col = 4

row_data, data = generate_data(4, shape[0], 10)
print(f'row: {row_data[:1]}\n')
print(f'data: {data[:1]}\n')

row: [array([0, 3, 1, 0, 3, 3, 3, 3, 1, 3])]

data: [[(0, 0), (1, 3), (2, 1), (3, 0), (4, 3), (5, 3), (6, 3), (7, 3), (8, 1), (9, 3)]]



In [6]:
from configs import * # I'm so sorry for this...

In [7]:
tm = DelayedFeedbackTM(**config_tm)
tp = UnionTemporalPooler(**config_tp)
errors = run(tm, tp, data[0], state_encoder, action_encoder)
print(data)
print(errors)

[[(0, 0), (1, 3), (2, 1), (3, 0), (4, 3), (5, 3), (6, 3), (7, 3), (8, 1), (9, 3)], [(0, 1), (1, 2), (2, 0), (3, 3), (4, 2), (5, 0), (6, 0), (7, 0), (8, 2), (9, 1)], [(0, 2), (1, 3), (2, 3), (3, 2), (4, 0), (5, 1), (6, 1), (7, 1), (8, 1), (9, 0)], [(0, 1), (1, 0), (2, 3), (3, 0), (4, 3), (5, 1), (6, 2), (7, 3), (8, 3), (9, 0)]]
None


In [8]:
plt.plot(range(len(errors)), errors)
plt.title('tm_train_errors')

TypeError: object of type 'NoneType' has no len()

In [31]:
class MyUTP:
    _initial_pooling = 1
    _pooling_decay = 0.1
    def __init__(self, _shape):
        self._union_sdr = SDR(_shape)
        self._pooling_activations = np.zeros(_shape)
        self._shape = _shape

    def pooling_decay_step(self):
        self._pooling_activations -= np.ones(self._shape) * self._pooling_decay
        self._pooling_activations = self._pooling_activations.clip(0, float('inf'))

    def compute(self, _sdr):
        self.pooling_decay_step()

        self._pooling_activations[_sdr.sparse] += self._initial_pooling

        self._union_sdr.dense = self._pooling_activations != 0
tp_shape = [input_columns * cells_per_column]

my_utp = MyUTP(tp_shape)
my_utp.compute(make_sdr((1,), tp_shape))
my_utp.compute(make_sdr((3,), tp_shape))

print(my_utp._pooling_activations)

[0.  0.9 0.  1.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.
 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0. 