In [1]:
import subprocess
import sys

import jax
# import jax.numpy as np
# import jax.scipy as jsp

import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import typing as t

import os

In [2]:
kx = np.array([
                [1., 0., -1.],
                [2., 0., -2.],
                [1., 0., -1.]
])
ky = np.transpose(kx)
def sobel_x(A):
  """
  A : (x, y, c)
  ret : (x, y, c)
  """
  return np.dstack([sp.signal.convolve2d(A[:, :, c], kx, mode = 'same') 
                    for c in range(A.shape[-1])])
def sobel_y(A):
  return np.dstack([sp.signal.convolve2d(A[:, :, c], ky, mode = 'same') 
                    for c in range(A.shape[-1])])
  
# @jax.jit
def sobel(A):
  return np.concatenate((sobel_y(A)[:, :, None, :], sobel_x(A)[:, :, None, :]),
                         axis = 2)

In [3]:
def rollout(step_fn, c_params, A, steps, verbose = False):
  obs = np.zeros((steps+1, *A.shape))
  obs[0] = A
  rnge = range(steps)
  for t in rnge :
    A = step_fn(A, c_params)
    obs[t+1] = A
  return obs

In [4]:
class Rule_space :
  #-----------------------------------------------------------------------------
  def __init__(self, nb_k, init_shape = (40, 40)):
    self.nb_k = nb_k
    self.init_shape = init_shape
    self.kernel_keys = 'r b w a m s h'.split()
    self.spaces = {
        "r" : {'low' : .2, 'high' : 1., 'mut_std' : .2, 'shape' : None},
        "b" : {'low' : .001, 'high' : 1., 'mut_std' : .2, 'shape' : (3,)},
        "w" : {'low' : .01, 'high' : .5, 'mut_std' : .2, 'shape' : (3,)},
        "a" : {'low' : .0, 'high' : 1., 'mut_std' : .2, 'shape' : (3,)},
        "m" : {'low' : .05, 'high' : .5, 'mut_std' : .2, 'shape' : None},
        "s" : {'low' : .001, 'high' : .18, 'mut_std' : .01, 'shape' : None},
        "h" : {'low' : .01, 'high' : 1., 'mut_std' : .2, 'shape' : None},
        'T' : {'low' : 10., 'high' : 50., 'mut_std' : .1, 'shape' : None},
        'R' : {'low' : 2., 'high' : 25., 'mut_std' : .2, 'shape' : None},
        'init' : {'low' : 0., 'high' : 1., 'mut_std' : .2, 'shape' : self.init_shape}
    }
  #-----------------------------------------------------------------------------
  def sample(self):
    kernels = {}
    for k in 'rmsh':
      kernels[k] = np.random.uniform(
          self.spaces[k]['low'], self.spaces[k]['high'], self.nb_k
      )
    for k in "awb":
      kernels[k] = np.random.uniform(
          self.spaces[k]['low'], self.spaces[k]['high'], (self.nb_k, 3)
      )
    return {
        'kernels' : kernels, 
        'T' : np.random.uniform(self.spaces['T']['low'], self.spaces['T']['high']),
        'R' : np.random.uniform(self.spaces['R']['low'], self.spaces['R']['high']),
        'init' : np.random.rand(*self.init_shape) 
    }
  #-----------------------------------------------------------------------------

In [5]:
def sigmoid(x):
  return 0.5 * (np.tanh(x / 2) + 1)

ker_f = lambda x, a, w, b : (b * np.exp( - (x[..., None] - a)**2 / w)).sum(-1)

bell = lambda x, m, s: np.exp(-((x-m)/s)**2 / 2)

def growth(U, m, s):
  return bell(U, m, s)*2-1

In [6]:
def compile_kernel_computer(SX, SY, nb_k, params):
  """return a jit compiled function taking as input lenia raw params and returning computed kernels (compiled params)"""
  mid = SX // 2
  """Compute kernels and return a dic containing fft kernels, T and R"""
  kernels = params['kernels']

  Ds = [ np.linalg.norm(np.mgrid[-mid:mid, -mid:mid], axis=0) / 
        ((params['R']+15) * kernels['r'][k]) for k in range(nb_k) ]  # (x,y,k)
  K = np.dstack([sigmoid(-(D-1)*10) * ker_f(D, kernels["a"][k], kernels["w"][k], kernels["b"][k]) 
                  for k, D in zip(range(nb_k), Ds)])
  nK = K / np.sum(K, axis=(0,1), keepdims=True)
  fK = np.fft.fft2(np.fft.fftshift(nK, axes=(0,1)), axes=(0,1))

  compiled_params = {k : kernels[k] for k in kernels.keys()}
  for k in ['R', 'T']:
    compiled_params[k] = params[k]
  compiled_params['fK'] = fK
  return compiled_params

In [7]:
def build_system(SX : int, SY : int, nb_k : int, C : int, c0 : t.Iterable = None, 
                 c1 : t.Iterable = None, dt : float = .5, dd : int = 5, 
                 sigma : float = .65, n = 2, theta_A = None) -> t.Callable:
  """
  returns step function State x Params(compiled) --> State
  
  Params :
    SX, SY : world size
    nb_k : number of kernels
    C : number of channels
    c0 : list[int] of size nb_k where c0[i] is the source channel of kernel i
    c1 : list[list[int]] of size C where c1[i] is the liste of kernel indexes targetting channel i
    dt : float integrating timestep
    dd : max distance to look for when moving matter
    sigma : half size of the Brownian motion distribution
    n : scaling parameter for alpha
    theta_A : critical mass at which alpha = 1
  Return :
    callable : array(SX, SY, C) x params --> array(SX, SY, C)
  """

  if theta_A is None:
    theta_A = C

  if c0 is None or c1 is None :
    c0 = [0] * nb_k
    c1 = [[i for i in range(nb_k)]]
  
  xs = np.concatenate([np.arange(SX) for _ in range(SY)])
  ys = np.arange(SY).repeat(SX)

  x, y = np.arange(SX), np.arange(SY)
  X, Y = np.meshgrid(x, y)
  pos = np.dstack((Y, X)) + .5 #(SX, SY, 2)

  rollxs = []
  rollys = []
  for dx in range(-dd, dd+1):
    for dy in range(-dd, dd+1):
      rollxs.append(dx)
      rollys.append(dy)
  rollxs = np.array(rollxs)
  rollys = np.array(rollys)


#   @partial(jax.vmap, in_axes = (0, 0, None, None))
  def step_flow(rollx, rolly, A, mus):
    rollA = np.roll(A, (rollx, rolly), axis = (0, 1))
    dpmu = np.absolute(pos[..., None] - np.roll(mus, (rollx, rolly), axis = (0, 1))) # (x, y, 2, c)
    sz = .5 - dpmu + sigma #(x, y, 2, c)
    area = np.prod(np.clip(sz, 0, min(1, 2*sigma)) , axis = 2) / (4 * sigma**2) # (x, y, c)
    nA = rollA * area
    return nA


  def step(A, params):
    """
    Main step
    A : state of the system (SX, SY, C)
    params : compiled paremeters (dict) must contain m, s, h and fK (computed kernels fft)
    """
    #---------------------------Original Lenia------------------------------------
    fA = np.fft.fft2(A, axes=(0,1))  # (x,y,c)

    fAk = fA[:, :, c0]  # (x,y,k)

    U = np.real(np.fft.ifft2(params['fK'] * fAk, axes=(0,1)))  # (x,y,k)

    G = growth(U, params['m'], params['s']) * params['h']  # (x,y,k)

    H = np.dstack([ G[:, :, c1[c]].sum(axis=-1) for c in range(C) ])  # (x,y,c)

    #-------------------------------FLOW------------------------------------------

    F = sobel(H) #(x, y, 2, c)

    C_grad = sobel(A.sum(axis = -1, keepdims = True)) #(x, y, 2, 1)
  
    alpha = np.clip((A[:, :, None, :]/theta_A)**n, .0, 1.)
    
    F = F * (1 - alpha) - C_grad * alpha
    
    mus = pos[..., None] + dt * F #(x, y, 2, c) : target positions (distribution centers)

    nA = step_flow(rollxs, rollys, A, mus).sum(axis = 0)
    
    return nA

  return step

In [8]:
SX = SY = 64 # World dimensions
C = 1 # Number of channels
nb_k = 2 # Number of kernels
c0 = None ; c1 = None # kernel connections : by default all kernels are from channel 0 to channel 0 in 1-channel Lenia
rule_space = Rule_space(nb_k) 

step_fn = build_system(SX, SY, nb_k, C, c0, c1, dd = 2, dt = .3, sigma = .65)

params = rule_space.sample()
c_params = compile_kernel_computer(SX, SY, nb_k, params) # Kernel computer : raw params --> compiled params
steps = 5
A0 = np.zeros((SX, SY, C))
A0[SX//2-20:SX//2+20, SY//2-20:SY//2+20, 0] = params['init']
obs = rollout(step_fn, c_params, A0, steps, verbose = True)

ValueError: shape mismatch: objects cannot be broadcast to a single shape.  Mismatch is between arg 0 with shape (2, 25) and arg 1 with shape (2,).

In [None]:
print(obs.shape)