<a href="https://colab.research.google.com/github/amolk/AGI-experiments/blob/master/Pattern%20Machine/03_Pattern_Machine.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Pattern machine
- continuous, i.e. activation changes over time as decaying history of instantaneous activation
- modular, i.e. connect up more flexibly,
- use multi-patterns, i.e. pattern contains more than 1 weights tensor. This is to represent input and output jointly.

Architectural decisions
- Signal, SignalGrid, CompositeSignalGrid
- input patches may overlap when utility factor > 1, fixed number (a grid) of patterns per patch
- output neighborhood is the patterns corresponding to a neighborhood of patches

In [None]:
!pip install ipytest

Collecting ipytest
  Downloading https://files.pythonhosted.org/packages/a9/d6/1797c114d57ec1c93b8078d81bd09b9f82d5f3a989c11fd1c575ff2846e7/ipytest-0.9.1-py3-none-any.whl
Collecting pytest>=5.4
[?25l  Downloading https://files.pythonhosted.org/packages/b1/ee/53945d50284906adb1e613fabf2e1b8b25926e8676854bb25b93564c0ce7/pytest-6.1.2-py3-none-any.whl (272kB)
[K     |████████████████████████████████| 276kB 4.8MB/s 
Collecting pluggy<1.0,>=0.12
  Downloading https://files.pythonhosted.org/packages/a0/28/85c7aa31b80d150b772fbe4a229487bc6644da9ccb7e427dd8cc60cb8a62/pluggy-0.13.1-py2.py3-none-any.whl
[31mERROR: datascience 0.10.6 has requirement folium==0.2.1, but you'll have folium 0.8.3 which is incompatible.[0m
Installing collected packages: pluggy, pytest, ipytest
  Found existing installation: pluggy 0.7.1
    Uninstalling pluggy-0.7.1:
      Successfully uninstalled pluggy-0.7.1
  Found existing installation: pytest 3.6.4
    Uninstalling pytest-3.6.4:
      Successfully uninstalled 

In [None]:
%load_ext autoreload
%autoreload 1
import ipytest
ipytest.autoconfig()

In [310]:
%%writefile utils.py
import pdb
import matplotlib.pyplot as plt
import numpy as np
from scipy.spatial import Voronoi, voronoi_plot_2d

def pretty_s(name, clas, indent=0):
  if type(clas).__name__ == 'Tensor':
    return ' ' * indent + name + ":" + type(clas).__name__ + " size" + str(tuple(clas.shape))

  strs = []
  strs.append(' ' * indent + name + ":" + type(clas).__name__)

  indent += 2
  for k,v in clas.__dict__.items():
    if '__dict__' in dir(v):
      strs.append(pretty_s(k, v,indent))
    elif '__iter__' in dir(v):
      if type(v) is tuple:
        strs.append(' ' * indent + k + ' = ' + str(v))
      else:
        strs.append(' ' * indent +  k + ' = [')
        for index, item in enumerate(v):
          if '__dict__' in dir(item):
            strs.append(pretty_s(str(index), item, indent+2))
          else:
            strs.append(' ' * (indent+2) + str(item))
        strs.append(' ' * indent +  ']')
    else:
      strs.append(' ' * indent +  k + ' = ' + str(v))

  return "\n".join(strs)
          
def pretty_print(name, clas, indent=0):
  print(pretty_s(name, clas, indent))

def soft_add(a, b, tau):
  return a * (1 - tau) + b * tau

def add_gaussian_noise(tensor, mean=0., std=1.):
    t = tensor + torch.randn(tensor.size()).to(device) * std + mean
    t.to(device)
    return t

def plot_patterns(patterns, pattern_lr, dataset, voronoi=False, annotate=False, figsize=(7,7), dpi=100):
  patterns = patterns.cpu()
  dataset = dataset.cpu()
  assert len(patterns.shape) == 2 # (pattern count, 2)
  assert patterns.shape[1] == 2 # 2D

  rgba_colors = torch.zeros((patterns.shape[0], 4))

  # for blue the last column needs to be one
  rgba_colors[:,2] = 1.0
  # the fourth column needs to be your alphas
  if pattern_lr is not None:
    alpha = (1.1 - pattern_lr.cpu()).clamp(0, 1) * 0.9
    rgba_colors[:, 3] = alpha
  else:
    rgba_colors[:, 3] = 1.0

  plt.figure(figsize=figsize, dpi=dpi)
  ax = plt.gca()
  ax.cla() # clear things for fresh plot

  if annotate:
    for i in range(patterns.shape[0]):
      ax.annotate(str(i), (patterns[i][0], patterns[i][1]), xytext=(5,-3), textcoords='offset points')

  ax.scatter(patterns[:, 0], patterns[:, 1], marker='.', c=rgba_colors, s=50)
  ax.scatter(dataset[:, 0], dataset[:, 1], marker='.', c='r', s=10)

  if voronoi:
    vor = Voronoi(patterns)
    vor_fig = voronoi_plot_2d(vor, ax=ax, show_vertices=False, line_colors='gray',
                              line_width=1, line_alpha=0.2, point_size=0)

  ax.set_xlim(0, 1)
  ax.set_ylim(0, 1)
  plt.show()

"""
Create a numpy array of given shape, each element initialized using supplied function
Example: make_ndarray((2,3), lambda multi_index:multi_index)
"""
def make_ndarray(shape, fn):
  a = np.empty(shape, dtype=object)
  with np.nditer(a, flags=['refs_ok', 'multi_index'], op_flags=['readwrite']) as it:
    for x in it:
      a[it.multi_index] = fn(it.multi_index)

  return a

Overwriting utils.py


In [311]:
%aimport utils

In [338]:
%%writefile pattern.py

import torch
import numpy
import pdb
from typing import List, Tuple
from utils import pretty_s
import numpy as np

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

class GridShapeMismatchError(Exception): pass
class NoComponentsError(Exception): pass

class SignalUtils:
  @staticmethod
  def compute_precision(variance):
    return torch.exp(-variance)

class SignalGridHP:
  def __init__(self, grid_shape:Tuple, signal_shape:Tuple, init_pixel_scale:float=0.1, init_variance:float=10):
    self.grid_shape = grid_shape
    self.grid_size = np.prod(self.grid_shape)
    if self.grid_size <= 0:
      raise ValueError("Invalid grid size")

    self.signal_shape = tuple(signal_shape)
    self.signal_size = np.prod(self.signal_shape)
    if self.signal_size <= 0:
      raise ValueError("Invalid signal size")

    self.init_pixel_scale = init_pixel_scale
    self.init_variance = init_variance

class SignalGrid:
  def __init__(self, hp:SignalGridHP, alloc_pixels=True, pixels=None):
    self.hp = hp
    if pixels is not None:
      self.pixels = pixels
    elif alloc_pixels:
      self.pixels = torch.rand((hp.grid_size, hp.signal_size)).to(device) * hp.init_pixel_scale
    else:
      self.pixels = None

    self.variance = torch.ones((hp.grid_size, hp.signal_size)).to(device) * hp.init_variance
    self.refresh_precision()

  def refresh_precision(self):
    self.precision = SignalUtils.compute_precision(self.variance)

  @property
  def signal_shape(self):
    return self.hp.signal_shape

  @property
  def __dict__(self):
    return {
        'grid_shape': self.hp.grid_shape,
        'signal_shape': self.hp.signal_shape,
        'pixels': self.pixels,
        'precision': self.precision
    }

  def __repr__(self):
    return pretty_s("", self)

class CompositeSignalGridHP:
  def __init__(self, hps:List[SignalGridHP]):
    if len(hps) == 0:
      raise NoComponentsError("Must specify at least one component")

    self.components = hps
    self.grid_shape = hps[0].grid_shape

    # all components must have same grid size
    for component_hp in hps:
      if component_hp.grid_shape != hps[0].grid_shape:
        raise GridShapeMismatchError

class CompositeSignalGrid:
  @staticmethod
  def from_tensors(signals:List[torch.Tensor], variance:float=0.0):
    signal_hps = [SignalGridHP(grid_shape=(1,1), signal_shape=signal.shape, init_variance=variance) for signal in signals]
    hp = CompositeSignalGridHP(hps=signal_hps)
    result = CompositeSignalGrid(hp=hp, alloc=False)
    for index, component in enumerate(result.components):
      component.pixels = signals[index]
    return result

  def from_signal_grids(signal_grids:List[SignalGrid]):
    result = CompositeSignalGrid(hp=None)
    hp = CompositeSignalGridHP(hps=[sg.hp for sg in signal_grids])
    result.hp = hp
    result.components = signal_grids
    return result

  def __init__(self, hp:CompositeSignalGridHP=None, alloc=True):
    self.hp = hp
    if hp:
      self.components = [SignalGrid(component_hp, alloc_pixels=alloc) for component_hp in hp.components]

  # add a SignalGrid as a component  
  def add_component(self, o:SignalGrid):
    assert self.hp.grid_shape == o.hp.grid_shape, f"{self.hp.grid_shape} != {o.hp.grid_shape}"

    self.hp.components.append(o.hp)
    self.components.append(o)

  @property
  def signal_shape(self):
    return [c.hp.signal_shape for c in self.components]

  @property
  def grid_shape(self):
    return self.hp.grid_shape

  @property
  def component_count(self):
    return len(self.components)

  def __repr__(self):
    return pretty_s("", self)

class PatternGridHP:
  def __init__(self, grid_shape, pattern_composite_signal_shape:Tuple):
    self.grid_shape = grid_shape
    self.grid_size = np.prod(self.grid_shape)

    self.composite_signal_grid_hp =  CompositeSignalGridHP(hps=[SignalGridHP(grid_shape=grid_shape, signal_shape=signal_shape) for signal_shape in pattern_composite_signal_shape])

class PatternGrid:
  def __init__(self, hp:PatternGridHP):
    self.hp = hp
    self.composite_signal_grid = CompositeSignalGrid(hp.composite_signal_grid_hp)
    self.alpha = torch.ones((hp.grid_size,)).to(device)

  def __repr__(self):
    return pretty_s("", self)

class PatternSimilarityHP:
  def __init__(self, enable_precision_weighted_distance=True):
    self.enable_precision_weighted_distance = enable_precision_weighted_distance

class PatternSimilarity:
  def __init__(self, signal:CompositeSignalGrid, patterns:CompositeSignalGrid, hp:PatternSimilarityHP=None):
    """
    signal must be        grid_shape=(grid_shape)                               signal_shape=(composite_signal_shapes)
    patterns must be      grid_shape=(grid_shape, per_item_pattern_grid_shape)  signal_shape=(composite_signal_shapes)

    Each signal is compared with corresponding per_x_pattern_grid_shape patterns.
    Each comparison is done across composite signal components
    """
    if hp:
      self.hp = hp
    else:
      self.hp = PatternSimilarityHP()

    assert signal.component_count == patterns.component_count, f"signal.component_count {signal.component_count} != patterns.component_count {patterns.component_count}"
    assert signal.signal_shape == patterns.signal_shape, f"signal.signal_shape {signal.signal_shape} != patterns.signal_shape {patterns.signal_shape}"
    assert patterns.grid_shape[0:len(signal.grid_shape)] == signal.grid_shape, f"patterns.grid_shape {patterns.grid_shape} must match 0-n dimensions with signal.grid_shape {signal.grid_shape}"
    per_item_pattern_grid_shape = patterns.grid_shape[len(signal.grid_shape):]
    per_item_pattern_grid_size = np.prod(per_item_pattern_grid_shape)

    self.dist_1 = []
    self.dist_d = []
    self.dist = []
    self.sim_components = []

    # find similarity based on each signal component
    for component_index in range(signal.component_count):
      x_component = signal.components[component_index]
      y_component = patterns.components[component_index]

      xs = x_component.pixels.shape
      x_component_pixels_expanded = x_component.pixels \
                                    .unsqueeze(dim=1) \
                                    .expand((xs[0], per_item_pattern_grid_size, xs[1])) \
                                    .reshape(-1, xs[1]) # expensive to reshape. How to vectorize better?
      assert x_component_pixels_expanded.shape == y_component.pixels.shape
      x_component_precision_expanded = x_component.precision \
                                    .unsqueeze(dim=1) \
                                    .expand((xs[0], per_item_pattern_grid_size, xs[1])) \
                                    .reshape(-1, xs[1]) # expensive to reshape. How to vectorize better?
      assert x_component_precision_expanded.shape == y_component.precision.shape

      dist_1_component, dist_d_component, dist_component = self.l2_distance(
          x=x_component_pixels_expanded,
          x_precision=x_component_precision_expanded,
          y=y_component.pixels,
          y_precision=y_component.precision)
      
      sim_component = torch.exp(-dist_component)

      self.dist_1.append(dist_1_component)
      self.dist_d.append(dist_d_component)
      self.dist.append(dist_component)
      self.sim_components.append(sim_component)

    # final similarity is mean of signal component similarities
    # this equalizes class weights for all components (e.g. modalities)
    self.sim = torch.stack(self.sim_components).mean(dim=0)

  def l2_cross_distance(self, x, x_precision, y, y_precision):
    xs = x.shape
    assert len(xs) == 2
    assert (x_precision is None) or (x_precision.shape == xs), "Precision, if specified, must be same shape as patterns"

    ys = y.shape
    assert len(ys) == 2
    assert (y_precision is None) or (y_precision.shape == ys), "Precision, if specified, must be same shape as patterns"

    assert xs[1] == ys[1], "Patch size, i.e. dim 1, must match"

    n = xs[0]
    m = ys[0]
    d = xs[1]

    x = x.unsqueeze(1).expand(n, m, d)
    x_precision = x_precision.unsqueeze(1).expand(n, m, d)

    y = y.unsqueeze(0).expand(n, m, d)
    y_precision = y_precision.unsqueeze(0).expand(n, m, d)

    return self.l2_distance(x, x_precision, y, y_precision)

  def l2_distance(self, x, x_precision, y, y_precision):
    dist_1 = (x - y).abs()
    dist_d = torch.pow(dist_1, 2)

    if self.hp.enable_precision_weighted_distance:
      if x_precision is not None:
        dist_d = dist_d * x_precision

      if y_precision is not None:
        dist_d = dist_d * y_precision

    dist = dist_d.sum(-1).sqrt()
    return dist_1, dist_d, dist


Overwriting pattern.py


In [339]:
%aimport pattern

In [340]:
%%writefile convoluation_utils.py
import math
from pattern import *
from typing import List, Tuple
import numpy as np
import pdb

class ConvolutionUtils:
  @staticmethod
  def make_afferent_patches(signal:CompositeSignalGrid, grid_shape:Tuple, coverage_factor:float=1.0):
    # print("make_afferent_patches")
    # print("  signal", signal.signal_shape)
    # print("  grid_shape", grid_shape)
    # print("  coverage_factor", coverage_factor)

    assert signal.grid_shape == (1,1)

    if grid_shape == (1,1):
      # no convoluation
      return signal

    for grid_shape_i in grid_shape:
      assert grid_shape_i > 1

    patch_signal_grids = []

    for component_index in range(len(signal.components)):
      # print("  component", component_index)
      component = signal.components[component_index]

      patch_shape = tuple([int(coverage_factor *  component.signal_shape[i] / grid_shape[i]) for i in range(len(grid_shape))])
      # print("    patch_shape", patch_shape)
      stride = tuple([math.floor((component.signal_shape[i]-patch_shape[i])/(grid_shape[i]-1)) for i in range(len(grid_shape))])
      # print("    stride", stride)

      patches = ConvolutionUtils.conv_slice(component.pixels.view((1,) + component.hp.signal_shape), patch_shape, stride=stride).squeeze(dim=0)
      sghp = SignalGridHP(grid_shape=grid_shape, signal_shape=patch_shape)
      patch_signal_grid = SignalGrid(hp=sghp, alloc_pixels=False, pixels = patches)
               
      # print("  patch_signal_grid", patch_signal_grid)
      patch_signal_grids.append(patch_signal_grid)

    patches = CompositeSignalGrid.from_signal_grids(patch_signal_grids)
    return patches

  @staticmethod
  def conv_slice(images, kernel_shape, stride, padding=0):
    assert len(images.shape) == 3, "Must be (image count, image height, image width)"
    images = images.unsqueeze(1)

    fold_params = dict(kernel_size=kernel_shape, dilation=1, padding=padding, stride=stride)
    unfold = torch.nn.Unfold(**fold_params)
    # print("images", images.shape)
    unfolded = unfold(images)
    unfolded = unfolded.view(images.shape[0], -1, unfolded.shape[-1])
    unfolded = unfolded.transpose(1, 2)
    return unfolded

Overwriting convoluation_utils.py


In [341]:
%aimport convoluation_utils

In [342]:
%%run_pytest[clean]
"""
Test SignalGrid
"""

import pytest
import pdb
from pattern import *

def test_signal_grid_invalid_grid_size1():
  with pytest.raises(ValueError):
    SignalGridHP(
      grid_shape=(0,4),     # <-- zero
      signal_shape=(5,6,2)
    )

def test_signal_grid_invalid_grid_size2():
  with pytest.raises(ValueError):
    SignalGridHP(
      grid_shape=(1,4),
      signal_shape=(5,-1,2) # <-- negative
    )

@pytest.fixture
def signal_grid_hp1():
  return SignalGridHP(
      grid_shape=(3,4),
      signal_shape=(5,6,2))

@pytest.fixture
def signal_grid1(signal_grid_hp1):
  return SignalGrid(hp=signal_grid_hp1)

def test_create_signal_grid(signal_grid_hp1):
  signal_grid = SignalGrid(hp=signal_grid_hp1)
  assert signal_grid.pixels.shape == (3*4, 5*6*2)
  assert signal_grid.variance.shape == signal_grid.pixels.shape
  assert signal_grid.precision.shape == signal_grid.pixels.shape

@pytest.fixture
def signal_grid_hp_degenerate():
  return SignalGridHP(
      grid_shape=(1,1),
      signal_shape=(1,1))

def test_create_signal_grid_degenerate(signal_grid_hp_degenerate):
  signal_grid = SignalGrid(hp=signal_grid_hp_degenerate)
  assert signal_grid.pixels.shape == (1, 1)
  assert signal_grid.variance.shape == signal_grid.pixels.shape
  assert signal_grid.precision.shape == signal_grid.pixels.shape

def test_composite_signal_grid_from_tensors():
  csg = CompositeSignalGrid.from_tensors([torch.ones((10,10)), torch.ones((5,5))])
  assert csg.grid_shape == (1,1)
  assert len(csg.components) == 2

  c0 = csg.components[0]
  assert c0.hp.grid_shape == (1,1)
  assert c0.signal_shape == (10,10)

  c1 = csg.components[1]
  assert c1.hp.grid_shape == (1,1)
  assert c1.signal_shape == (5,5)

def test_composite_signal_grid_from_signal_grids_error1():
  grid_shape0 = (3,4)
  signal_shape0 = (5,3,2)
  grid_shape1 = (1,2)
  signal_shape1 = (12,)

  sgs = [
         SignalGrid(hp=SignalGridHP(grid_shape=grid_shape0, signal_shape=signal_shape0)), 
         SignalGrid(hp=SignalGridHP(grid_shape=grid_shape1, signal_shape=signal_shape1))
  ]
  with pytest.raises(GridShapeMismatchError):
    CompositeSignalGrid.from_signal_grids(sgs)

def test_composite_signal_grid_from_signal_grids():
  grid_shape = (3,4)
  signal_shape0 = (5,3,2)
  signal_shape1 = (12,)

  sgs = [
         SignalGrid(hp=SignalGridHP(grid_shape=grid_shape, signal_shape=signal_shape0)), 
         SignalGrid(hp=SignalGridHP(grid_shape=grid_shape, signal_shape=signal_shape1))
  ]
  csg = CompositeSignalGrid.from_signal_grids(sgs)
  
  assert csg.hp.grid_shape == grid_shape
  assert len(csg.components) == 2

  c0 = csg.components[0]
  assert c0.hp.grid_shape == grid_shape
  assert c0.signal_shape == signal_shape0

  c1 = csg.components[1]
  assert c1.hp.grid_shape == grid_shape
  assert c1.signal_shape == signal_shape1


.......                                                                  [100%]
7 passed in 0.05s


In [343]:
%%run_pytest[clean]
"""
Test CompositeSignalGrid
"""

import pytest
from pattern import *

def test_csg_zero_components():
  """
  Must have at least 1 component
  """
  with pytest.raises(NoComponentsError):
    CompositeSignalGridHP([])

def test_csg_different_grid_shapes():
  """
  Components may not have different grid shapes
  """
  with pytest.raises(GridShapeMismatchError):
    CompositeSignalGridHP([
      SignalGridHP(grid_shape=(1,2), signal_shape=(3,4)),
      SignalGridHP(grid_shape=(2,2), signal_shape=(3,4))
    ])

@pytest.fixture
def composite_signal_grid1():
  return CompositeSignalGrid(CompositeSignalGridHP([
    SignalGridHP(grid_shape=(1,2), signal_shape=(3,4)),
    SignalGridHP(grid_shape=(1,2), signal_shape=(3,2,1))
  ]))

def test_csg_different_signal_shapes(composite_signal_grid1):
  """
  Components may have different signal shapes
  """
  assert composite_signal_grid1.component_count == 2
  assert composite_signal_grid1.components[0].signal_shape == (3,4)
  assert composite_signal_grid1.components[1].signal_shape == (3,2,1)
  assert composite_signal_grid1.signal_shape == [(3,4), (3,2,1)]

...                                                                      [100%]
3 passed in 0.02s


In [344]:
%%run_pytest[clean]
"""
Test PatternGrid
"""

import pytest
from pattern import *

@pytest.fixture
def pg1():
  pg_hp = PatternGridHP(grid_shape=(1,2), pattern_composite_signal_shape=[(3,4),(3,2,1)])

  return PatternGrid(hp=pg_hp)

def test_pg_create(pg1):
  pg1.alpha.shape == (1,2)

@pytest.fixture
def pg2():
  pg_hp = PatternGridHP(grid_shape=(1,), pattern_composite_signal_shape=[(1,),(1,)])

  return PatternGrid(hp=pg_hp)

def test_pg_create_2(pg2):
  pg2.alpha.shape == (1,)


..                                                                       [100%]
2 passed in 0.02s


In [345]:
%%run_pytest[clean] -s
"""
Test Similarity
"""

import pytest
from pattern import *
from utils import *

def make_signal():
  pg_hp = PatternGridHP(grid_shape=(2,4), pattern_composite_signal_shape=[(3,4),(3,2,1)])
  return PatternGrid(hp=pg_hp)

def make_patterns():
  pg_hp = PatternGridHP(grid_shape=(2,4,3,2), pattern_composite_signal_shape=[(3,4),(3,2,1)])
  return PatternGrid(hp=pg_hp)

def test_sim_low_precision():
  # By default, very low precision, so equal even when pixels are random
  pgs = [make_signal(), make_patterns()]
  sim = PatternSimilarity(pgs[0].composite_signal_grid, pgs[1].composite_signal_grid)
  assert sim.sim.shape == pgs[1].hp.grid_size
  assert torch.allclose(sim.sim, torch.ones_like(sim.sim))

def test_sim_disable_precision_weighting():
  # If disabled precision weighting, then unequal because pixels are random
  pgs = [make_signal(), make_patterns()]
  hp = PatternSimilarityHP(enable_precision_weighted_distance=False)
  sim = PatternSimilarity(pgs[0].composite_signal_grid, pgs[1].composite_signal_grid, hp=hp)
  assert sim.sim.shape == pgs[1].hp.grid_size
  assert not torch.allclose(sim.sim, torch.ones_like(sim.sim))

def test_sim_high_precision():
  # If high precision, then unequal given pixels are random
  pgs = [make_signal(), make_patterns()]

  # force high precisions
  for pg in pgs:
    for component in pg.composite_signal_grid.components:
      component.precision = torch.ones_like(component.precision)

  sim = PatternSimilarity(pgs[0].composite_signal_grid, pgs[1].composite_signal_grid)
  assert sim.sim.shape == pgs[1].hp.grid_size
  assert not torch.allclose(sim.sim, torch.ones_like(sim.sim))

...
3 passed in 0.03s


In [360]:
%%writefile layer.py

import torch
import numpy
import pdb
from typing import List, Tuple
from pattern import *
import math
from utils import *
from convoluation_utils import ConvolutionUtils

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

class CompositeSignalPatchGrid:
  def __init__(self, signal:CompositeSignalGrid, grid_shape:Tuple, coverage_factor=1.0):
    # signal, i.e. composite image, must be a single composite signal, i.e. its grid shape must be (1,)
    assert signal.hp.grid_shape == (1,1), f"{signal.hp.grid_shape} != (1,1)"
    self.patches:CompositeSignalGrid = ConvolutionUtils.make_afferent_patches(signal=signal, grid_shape=grid_shape, coverage_factor=coverage_factor)

class NeighborhoodPatchGrid:
  def __init__(self, signal:SignalGrid, patch_grid_shape:Tuple, per_patch_grid_shape:Tuple, patch_neighborhood_shape:Tuple):
    # signal must be a single signal, i.e. its grid shape must be (1,)
    assert signal.hp.grid_shape == (1,1), f"{signal.hp.grid_shape} != (1,1)"

    # signal shape must be flattend combination of patch_grid_shape and per_patch_grid_shape
    assert tuple(signal.hp.signal_shape) == tuple(np.multiply(patch_grid_shape, per_patch_grid_shape))

    stride = per_patch_grid_shape
    # print("  stride", stride)

    padding_patches = tuple([int((i-1)/2) for i in patch_neighborhood_shape])
    padding = tuple(np.multiply(padding_patches, per_patch_grid_shape))
    # print("  padding", padding)

    patch_shape = tuple(np.multiply(patch_neighborhood_shape, per_patch_grid_shape))
    patches = ConvolutionUtils.conv_slice(signal.pixels.view(signal.signal_shape).unsqueeze(dim=0), patch_shape, stride=stride, padding=padding).squeeze(dim=0)
    sghp = SignalGridHP(grid_shape=patch_grid_shape, signal_shape=patch_shape)
    self.patches:SignalGrid = SignalGrid(hp=sghp, alloc_pixels=False, pixels = patches)

class InputOutputPatchGrid:
  def __init__(self, patch_grid_shape:Tuple, input:CompositeSignalGrid, output:SignalGrid, output_patch_neighborhood_shape:Tuple, per_patch_pattern_grid_shape:Tuple, input_coverage_factor=1.0):
    input_grid_shape = output.signal_shape

    self.input_patches = CompositeSignalPatchGrid(signal=input, grid_shape=patch_grid_shape, coverage_factor=input_coverage_factor)
    self.output_patches = NeighborhoodPatchGrid(signal=output, patch_grid_shape=patch_grid_shape, per_patch_grid_shape=per_patch_pattern_grid_shape, patch_neighborhood_shape=output_patch_neighborhood_shape)
    
    self.patches = self.input_patches.patches
    self.patches.add_component(self.output_patches.patches)

class LayerHP:
  def __init__(self, input_signal_shapes:List[Tuple], input_coverage_factor:float, patch_grid_shape:Tuple, per_patch_pattern_grid_shape:Tuple, output_patch_neighborhood_shape:Tuple, output_tau=0.5):
    assert len(patch_grid_shape) == len(per_patch_pattern_grid_shape), "This is so that output can be flattened"
    assert len(patch_grid_shape) == len(output_patch_neighborhood_shape), "Output patch neighborhood is a subset of patch grid, so number of dimensions must match"
    # assert len(patch_grid_shape) == 2, "Currently support for 2D"

    self.input_signal_shapes:List[Tuple] = input_signal_shapes
    self.input_coverage_factor:float = input_coverage_factor
    self.patch_grid_shape:Tuple = patch_grid_shape
    self.per_patch_pattern_grid_shape:Tuple = per_patch_pattern_grid_shape
    self.output_patch_neighborhood_shape = output_patch_neighborhood_shape
    self.output_neighborhood_shape:Tuple = np.multiply(output_patch_neighborhood_shape, per_patch_pattern_grid_shape)
    self.output_tau:float = output_tau

    for size in self.output_patch_neighborhood_shape:
      assert size % 2 == 1, "Output patch neighborhood shape must be odd, so can be centered around specific output activation"

    # Get input patch shapes
    sample_input_signal = [torch.ones(shape) for shape in input_signal_shapes]
    sample_input = CompositeSignalGrid.from_tensors(sample_input_signal)
    patches = ConvolutionUtils.make_afferent_patches(signal=sample_input, grid_shape=patch_grid_shape, coverage_factor=input_coverage_factor)

    # Patterns -
    # each patch in the patch grid will get per_patch_pattern_grid_shape patterns, so patterns will be of shape -
    # (patch_grid_shape,) + (per_patch_pattern_grid_shape shape,) + (pattern_composite_signal_shape,)
    pattern_composite_signal_shape = patches.signal_shape + [self.output_neighborhood_shape] # input signals + output signal
    patch_grid_shape = patch_grid_shape
    pattern_grid_shape = patch_grid_shape + per_patch_pattern_grid_shape # 2D, 2D = 4D
    self.pattern_grid_hp:PatternGridHP = PatternGridHP(grid_shape=pattern_grid_shape, pattern_composite_signal_shape=pattern_composite_signal_shape)

    # Output is flattened 2D version of the 4D pattern_grid_shape
    output_shape = np.multiply(patch_grid_shape, per_patch_pattern_grid_shape) # height1*height2, width1*width2
    self.output_hp:SignalGridHP = SignalGridHP(grid_shape=(1,1), signal_shape=output_shape, init_pixel_scale=0.0)

    # print("Output shape", output_shape)

    # # input grid HP
    # hps = [
    #   SignalGridHP(
    #       grid_shape=output_shape, # Each grid element in input grid produces 1 pixel of output
    #       signal_shape=input_component.signal_shape)
    #   for input_component in patches.components # each component of input
    # ]
    # self.input_grid_hp = CompositeSignalGridHP(hps=hps.copy())

    # # pattern HP
    # hps.append(SignalGridHP(
    #     grid_shape=output_shape,
    #     signal_shape=output_neighborhood_shape))

    # self.pattern_grid_hp = PatternGridHP(
    #     grid_shape=pattern_grid_shape,
    #     composite_signal_grid_hp=CompositeSignalGridHP(hps=[
    #       SignalGridHP(grid_shape=pattern_grid_shape, signal_shape=component.signal_shape)
    #      for component in hps]))

class Layer:
  def __init__(self, hp:LayerHP):
    self.hp = hp
    self.patterns = PatternGrid(hp=self.hp.pattern_grid_hp)
    self.output = SignalGrid(hp.output_hp)

  def forward(self, input:CompositeSignalGrid):
    assert input.grid_shape == (1,1) # single signal input

    input_output_patch_grid = InputOutputPatchGrid(patch_grid_shape=self.hp.patch_grid_shape,
                                                   input=input,
                                                   output=self.output,
                                                   output_patch_neighborhood_shape=self.hp.output_patch_neighborhood_shape,
                                                   per_patch_pattern_grid_shape=self.hp.per_patch_pattern_grid_shape,
                                                   input_coverage_factor=1.0)
    # patches = LayerUtils.make_input_output_composite_patches(input=input, output=self.output, output_shape=self.hp.output_shape, output_neighborhood_shape=self.hp.output_neighborhood_shape, convolve_input=self.hp.convolve_input)
    # print("input_output_patch_grid.patches", input_output_patch_grid.patches)
    pattern_similarity = PatternSimilarity(signal=input_output_patch_grid.patches, patterns=self.patterns.composite_signal_grid)
    
    activation = pattern_similarity.sim.unsqueeze(dim=0)
    assert self.output.pixels.shape == activation.shape 

    self.output.pixels = soft_add(self.output.pixels, activation, tau=self.hp.output_tau)



Overwriting layer.py


In [361]:
%aimport layer

In [362]:

%%run_pytest[clean] -s
"""
Test Layer
"""

import pytest
from utils import *
from pattern import *
from layer import *
import pdb

def test_layer_create():
  hp = LayerHP(
      input_signal_shapes=[(10,15),(20,10)],
      input_coverage_factor=1.0, 
      patch_grid_shape=(5,5),
      per_patch_pattern_grid_shape=(4,4),
      output_patch_neighborhood_shape=(3,3),
      output_tau=0.5)
  layer = Layer(hp=hp)
  # pretty_print("Layer", layer)
  assert(True)

def test_layer_forward():


  input_signal_shapes = [(10,10), (20,20)]
  input_signals = [torch.ones(shape) for shape in input_signal_shapes]
  input = CompositeSignalGrid.from_tensors(input_signals)

  hp = LayerHP(
      input_signal_shapes=input_signal_shapes,
      input_coverage_factor=1.0, 
      patch_grid_shape=(5,5),
      per_patch_pattern_grid_shape=(2,3),
      output_patch_neighborhood_shape=(3,3),
      output_tau=0.5)

  layer = Layer(hp=hp)
  #pretty_print("Layer", layer)

  for _ in range(10):
    layer.forward(input)

  assert torch.allclose(layer.output.pixels, torch.ones_like(layer.output.pixels), atol=0.01)

..
2 passed in 0.03s


In [363]:
import pandas as pd
import numpy as np
import torch
from sklearn.datasets import load_boston
from utils import *
from layer import *

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

def normalize(df):
  df1 = (df - df.mean())/df.std()
  return df1

def scale(df):
  min = df.min()
  max = df.max()

  df1 = (df - min) / (max - min)
  return df1

dataset = load_boston()
dataset = pd.DataFrame(dataset.data, columns=dataset.feature_names)
dataset = pd.DataFrame(np.c_[scale(normalize(dataset['LSTAT'])), scale(normalize(dataset['RM']))], columns = ['LSTAT','RM'])
dataset = torch.tensor(dataset.to_numpy()).float().to(device)
dataset
X = dataset[:,0]
Y = dataset[:,1]

input_signal_shapes = [(1,1)]
hp = LayerHP(
      input_signal_shapes=input_signal_shapes,
      input_coverage_factor=1.0, 
      patch_grid_shape=(1,1),
      per_patch_pattern_grid_shape=(3,3),
      output_patch_neighborhood_shape=(1,1),
      output_tau=1.0) # set output_tau=1.0 for IID data

layer = Layer(hp=hp)

print("Patterns - input", layer.patterns.composite_signal_grid.components[0].pixels)
print("Patterns - output", layer.patterns.composite_signal_grid.components[1].pixels)

for i in range(4):
  input = CompositeSignalGrid.from_tensors([X[i].view(1,1)])
  input.components[0].variance *= 0
  input.components[0].refresh_precision()

  print("input", input.components[0].pixels)
  print("input precision", input.components[0].precision)
  layer.forward(input)
  print("layer output", layer.output.pixels.view(layer.hp.per_patch_pattern_grid_shape))


Patterns - input tensor([[0.0568],
        [0.0924],
        [0.0050],
        [0.0421],
        [0.0626],
        [0.0544],
        [0.0305],
        [0.0190],
        [0.0041]])
Patterns - output tensor([[0.0551, 0.0436, 0.0352, 0.0922, 0.0324, 0.0941, 0.0655, 0.0490, 0.0027],
        [0.0535, 0.0140, 0.0247, 0.0504, 0.0445, 0.0132, 0.0997, 0.0914, 0.0151],
        [0.0665, 0.0057, 0.0794, 0.0664, 0.0135, 0.0837, 0.0761, 0.0268, 0.0602],
        [0.0207, 0.0104, 0.0917, 0.0187, 0.0495, 0.0341, 0.0995, 0.0287, 0.0372],
        [0.0425, 0.0891, 0.0590, 0.0542, 0.0410, 0.0714, 0.0523, 0.0034, 0.0196],
        [0.0617, 0.0197, 0.0477, 0.0752, 0.0346, 0.0600, 0.0478, 0.0420, 0.0594],
        [0.0103, 0.0025, 0.0477, 0.0020, 0.0194, 0.0233, 0.0774, 0.0024, 0.0316],
        [0.0258, 0.0003, 0.0343, 0.0699, 0.0032, 0.0932, 0.0136, 0.0096, 0.0352],
        [0.0802, 0.0898, 0.0496, 0.0359, 0.0917, 0.0596, 0.0006, 0.0602, 0.0656]])
input tensor([[0.0897]])
input precision tensor([[1.]])
layer o

In [None]:
CompositeSignalGrid.from_tensors([X[0].unsqueeze(dim=0)])

We have patch grid of composite signal. Each composite signal gets its own grid of patterns.