In [None]:
a = np.arange(12).reshape((4, 3))
x = np.array([6, 7, 8])
(a == x).all(axis=1), np.argwhere((a == x).all(axis=1))[0][0]

In [None]:
import gym
import gym_minigrid as mg
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import clear_output
from time import sleep

%matplotlib inline

import random
import numpy as np
import sys

# fetch datasets from www.openML.org/ 
from sklearn.datasets import fetch_openml

from htm.bindings.algorithms import SpatialPooler, Classifier
from htm.bindings.sdr import SDR, Metrics


def load_ds(name, num_test, shape=None):
    """ 
    fetch dataset from openML.org and split to train/test
    @param name - ID on openML (eg. 'mnist_784')
    @param num_test - num. samples to take as test
    @param shape - new reshape of a single data point (ie data['data'][0]) as a list. Eg. [28,28] for MNIST
    """
    data = fetch_openml(name, version=1)
    sz=data['target'].shape[0]

    X = data['data']
    if shape is not None:
        new_shape = shape.insert(0, sz)
        X = np.reshape(X, shape)

    y = data['target'].astype(np.int32)
    # split to train/test data
    train_labels = y[:sz-num_test]
    train_images = X[:sz-num_test]
    test_labels  = y[sz-num_test:]
    test_images  = X[sz-num_test:]

    return train_labels, train_images, test_labels, test_images

def encode(data, out):
    """
    encode the (image) data
    @param data - raw data
    @param out  - return SDR with encoded data
    """
    out.dense = data >= np.mean(data) # convert greyscale image to binary B/W.
    #TODO improve. have a look in htm.vision etc. For MNIST this is ok, for fashionMNIST in already loses too much information


# These parameters can be improved using parameter optimization,
# see py/htm/optimization/ae.py
# For more explanation of relations between the parameters, see 
# src/examples/mnist/MNIST_CPP.cpp 
default_parameters = {
    'potentialRadius': 7,
    'boostStrength': 7.0,
    'columnDimensions': (79, 79),
    'dutyCyclePeriod': 1402,
    'localAreaDensity': 0.1,
    'minPctOverlapDutyCycle': 0.2,
    'potentialPct': 0.1,
    'stimulusThreshold': 6,
    'synPermActiveInc': 0.14,
    'synPermConnected': 0.5,
    'synPermInactiveDec': 0.02
}


def main(parameters=default_parameters, argv=None, verbose=True):

    # Load data.
    train_labels, train_images, test_labels, test_images = load_ds('mnist_784', 10000, shape=[28,28]) # HTM: ~95.6%
    #train_labels, train_images, test_labels, test_images = load_ds('Fashion-MNIST', 10000, shape=[28,28]) # HTM baseline: ~83%

    training_data = list(zip(train_images, train_labels))
    test_data     = list(zip(test_images, test_labels))
    random.shuffle(training_data)

    # Setup the AI.
    enc = SDR(train_images[0].shape)
    sp = SpatialPooler(
        inputDimensions            = enc.dimensions,
        columnDimensions           = parameters['columnDimensions'],
        potentialRadius            = parameters['potentialRadius'],
        potentialPct               = parameters['potentialPct'],
        globalInhibition           = True,
        localAreaDensity           = parameters['localAreaDensity'],
        stimulusThreshold          = int(round(parameters['stimulusThreshold'])),
        synPermInactiveDec         = parameters['synPermInactiveDec'],
        synPermActiveInc           = parameters['synPermActiveInc'],
        synPermConnected           = parameters['synPermConnected'],
        minPctOverlapDutyCycle     = parameters['minPctOverlapDutyCycle'],
        dutyCyclePeriod            = int(round(parameters['dutyCyclePeriod'])),
        boostStrength              = parameters['boostStrength'],
        seed                       = 0, # this is important, 0="random" seed which changes on each invocation
        spVerbosity                = 99,
        wrapAround                 = False)
    columns = SDR( sp.getColumnDimensions() )
    columns_stats = Metrics( columns, 99999999 )
    sdrc = Classifier()

#     # Training Loop
#     for i in range(len(train_images)):
#         img, lbl = training_data[i]
#         encode(img, enc)
#         sp.compute( enc, True, columns )
#         sdrc.learn( columns, lbl ) #TODO SDRClassifier could accept string as a label, currently must be int

#     print(str(sp))
#     print(str(columns_stats))

#     # Testing Loop
#     score = 0
#     for img, lbl in test_data:
#         encode(img, enc)
#         sp.compute( enc, False, columns )
#         if lbl == np.argmax( sdrc.infer( columns ) ):
#             score += 1
#     score = score / len(test_data)

#     print('Score:', 100 * score, '%')
#     return score

main()

Load data

In [None]:
train_labels, train_images, test_labels, test_images = load_ds('mnist_784', 10000, shape=[28,28]) # HTM: ~95.6%
# train_labels, train_images, test_labels, test_images = load_ds('Fashion-MNIST', 10000, shape=[28,28]) # HTM baseline: ~83%

training_data = list(zip(train_images, train_labels))
test_data     = list(zip(test_images, test_labels))

random.shuffle(training_data)
random.shuffle(test_data)

train_images, train_labels = zip(*training_data)
test_images, test_labels = zip(*test_data)

In [None]:
%%time

def run_bare_classifier(parameters=default_parameters, argv=None, verbose=True):
    training_data = list(zip(train_images, train_labels))
    test_data     = list(zip(test_images, test_labels))
    random.shuffle(training_data)

    # Setup the AI.
    enc = SDR(train_images[0].shape)
    columns_stats = Metrics( enc, 99999999 )
    sdrc = Classifier()

    # Training Loop
    for i in range(len(train_images)):
        img, lbl = training_data[i]
        encode(img, enc)
        sdrc.learn( enc, lbl )

    print(str(columns_stats))

    # Testing Loop
    score = 0
    for img, lbl in test_data:
        encode(img, enc)
        if lbl == np.argmax( sdrc.infer( enc ) ):
            score += 1
    score = score / len(test_data)

    print('Score:', 100 * score, '%')
    return score

run_bare_classifier()

In [None]:
%%time

def run_htm_sp(parameters=default_parameters, argv=None, verbose=True):
    training_data = list(zip(train_images, train_labels))
    test_data     = list(zip(test_images, test_labels))
    random.shuffle(training_data)

    # Setup the AI.
    enc = SDR(train_images[0].shape)
    sp = SpatialPooler(
        inputDimensions            = enc.dimensions,
        columnDimensions           = parameters['columnDimensions'],
        potentialRadius            = parameters['potentialRadius'],
        potentialPct               = parameters['potentialPct'],
        globalInhibition           = True,
        localAreaDensity           = parameters['localAreaDensity'],
        stimulusThreshold          = int(round(parameters['stimulusThreshold'])),
        synPermInactiveDec         = parameters['synPermInactiveDec'],
        synPermActiveInc           = parameters['synPermActiveInc'],
        synPermConnected           = parameters['synPermConnected'],
        minPctOverlapDutyCycle     = parameters['minPctOverlapDutyCycle'],
        dutyCyclePeriod            = int(round(parameters['dutyCyclePeriod'])),
        boostStrength              = parameters['boostStrength'],
        seed                       = 0, # this is important, 0="random" seed which changes on each invocation
        spVerbosity                = 99,
        wrapAround                 = False)
    columns = SDR( sp.getColumnDimensions() )
    columns_stats = Metrics( columns, 99999999 )
    sdrc = Classifier()

    # Training Loop
    for i in range(len(train_images)):
        img, lbl = training_data[i]
        encode(img, enc)
        sp.compute( enc, True, columns )
        sdrc.learn( columns, lbl )

    print(str(sp))
    print(str(columns_stats))

    # Testing Loop
    score = 0
    for img, lbl in test_data:
        encode(img, enc)
        sp.compute( enc, False, columns )
        if lbl == np.argmax( sdrc.infer( columns ) ):
            score += 1
    score = score / len(test_data)

    print('Score:', 100 * score, '%')
    return score

run_htm_sp()

In [None]:
%%time

from sklearn.linear_model import LogisticRegression
from scipy.sparse import csr_matrix

def encode_to_csr(x, enc: SDR):    
    encoded_images_flatten = []
    indptr = [0]
    for img in x_tr:
        encode(img, enc)
        encoded_images_flatten.extend(enc.sparse)
        indptr.append(len(encoded_images_flatten))
    
    data = np.ones(len(encoded_images_flatten))
    csr = csr_matrix((data, encoded_images_flatten, indptr))
    return csr

def run_bare_sklearn_classifier(x_tr,  y_tr, x_tst, y_tst, parameters=default_parameters):    
    # Get training data
    enc = SDR(train_images[0].shape)
    csr = encode_to_csr(x_tr, enc)
    
    linreg = LogisticRegression(tol=.001, max_iter=100, multi_class='multinomial', penalty='l2', solver='lbfgs', n_jobs=3)
    linreg.fit(csr, y_tr)
    
    csr = encode_to_csr(x_tst, enc)
    score = linreg.predict(csr) == y_tr
    score = score.mean()
    print('Score:', 100 * score, '%')
    return score

n = 100000
x_tr, y_tr = train_images[:n], train_labels[:n]
x_tst, y_tst = test_images[:n], test_labels[:n]

run_bare_sklearn_classifier(x_tr, y_tr, x_tst, y_tst)

In [None]:
%%time

from sklearn.linear_model import LogisticRegression
from scipy.sparse import csr_matrix

def encode_to_csr_sp(x, enc: SDR, sp, columns):            
    encoded_images_flatten = []
    indptr = [0]
    for img in x_tr:
        encode(img, enc)
        sp.compute( enc, False, columns )
        encoded_images_flatten.extend(columns.sparse)
        indptr.append(len(encoded_images_flatten))

    
    data = np.ones(len(encoded_images_flatten))
    csr = csr_matrix((data, encoded_images_flatten, indptr))
    return csr


def run_htm_sp_sklearn(x_tr,  y_tr, x_tst, y_tst, parameters=default_parameters):
    test_data     = list(zip(x_tst, y_tst))
    
    # Get training data
    enc = SDR(train_images[0].shape)
    sp = SpatialPooler(
        inputDimensions            = enc.dimensions,
        columnDimensions           = parameters['columnDimensions'],
        potentialRadius            = parameters['potentialRadius'],
        potentialPct               = parameters['potentialPct'],
        globalInhibition           = True,
        localAreaDensity           = parameters['localAreaDensity'],
        stimulusThreshold          = int(round(parameters['stimulusThreshold'])),
        synPermInactiveDec         = parameters['synPermInactiveDec'],
        synPermActiveInc           = parameters['synPermActiveInc'],
        synPermConnected           = parameters['synPermConnected'],
        minPctOverlapDutyCycle     = parameters['minPctOverlapDutyCycle'],
        dutyCyclePeriod            = int(round(parameters['dutyCyclePeriod'])),
        boostStrength              = parameters['boostStrength'],
        seed                       = 0, # this is important, 0="random" seed which changes on each invocation
        spVerbosity                = 0,
        wrapAround                 = False)
    columns = SDR( sp.getColumnDimensions() )

    # train SP
    for img in x_tr:
        encode(img, enc)
        sp.compute( enc, True, columns )
    
    # train linreg
    csr = encode_to_csr_sp(x_tr, enc, sp, columns)
    
    linreg = LogisticRegression(tol=.001, max_iter=100, multi_class='multinomial', penalty='l2', solver='lbfgs', n_jobs=3)
    linreg.fit(csr, y_tr)
    
    csr = encode_to_csr_sp(x_tst, enc, sp, columns)
    score = linreg.predict(csr) == y_tr
    score = score.mean()
    print('Score:', 100 * score, '% for n =', len(x_tr))
    return score

n = 100000
x_tr, y_tr = train_images[:n], train_labels[:n]
x_tst, y_tst = test_images[:n], test_labels[:n]

run_htm_sp_sklearn(x_tr, y_tr, x_tst, y_tst)