In [1]:
import os
import numpy as np
from numba import njit, vectorize, float64, uint8, typed, typeof
import sys
from itertools import product, chain
import codecs, json 
from typing import List
import time
import signal

In [2]:
class MyTimeoutError(Exception):
    def __init__(self, value = "Timed Out"):
        self.value = value
    def __str__(self):
        return repr(self.value)

def timeout(seconds_before_timeout):
    def decorate(f):
        def handler(signum, frame):
            raise MyTimeoutError()
        def new_f(*args, **kwargs):
            old = signal.signal(signal.SIGALRM, handler)
            signal.alarm(seconds_before_timeout)
            try:
                result = f(*args, **kwargs)
            finally:
                # reinstall the old signal handler
                signal.signal(signal.SIGALRM, old)
                # cancel the alarm
                # this line should be inside the "finally" block (per Sam Kortchmar)
                signal.alarm(0)
            return result
        new_f.func_name = f.__name__
        return new_f
    return decorate

In [3]:
@njit()
def create_f_dict(rule):
    f_dict=typed.Dict()
    for i in range(1, -1, -1):
        for j in range(1, -1, -1):
            for k in range(1, -1, -1):
                num=2**(4*i+2*j+k)
                if rule>=num:
                    rule=rule-num
                    f_dict[(uint8(i), uint8(j), uint8(k))]=uint8(1)
                else:
                    f_dict[(uint8(i), uint8(j), uint8(k))]=uint8(0)
    return f_dict

@njit()
def sim_cyclic(config, time, l, f_dict, shift=0):
    for t in range(time):
        new_config=np.empty(l, dtype=uint8)
        for i in range(1, l-1):
            new_config[i+shift]=f_dict[(config[i-1],config[i],config[i+1])]
        new_config[0]=f_dict[(config[(-1-shift)%l],config[(-shift)%l],config[(1-shift)%l])]
        new_config[l-1]=f_dict[(config[(l-2-shift)%l],config[(l-1-shift)%l],config[(l-shift)%l])]
        config=new_config
    return config

def encode(config, enc, cell_size):
    return np.asarray(list(chain.from_iterable([enc[i] for i in config])), dtype=np.uint8)

def decode(config, dec, cell_size):
        return [dec.index(config[i*cell_size:(i+1)*cell_size]) for i in range(len(config)//cell_size)]

In [4]:
def get_train_score_and_dec(f1, f2, enc, enc_size, train):
    times, grids, configs=train
    score=0
    norm=0
    requests={}
    dec=[{tuple(enc[0])}, {tuple(enc[1])}]
        
    for t, grid_size, config1 in zip(times, grids, configs):
        config2=encode(config1, enc, enc_size)

        output1=sim_cyclic(config1, t, grid_size, f1)
        output2=sim_cyclic(config2, enc_size*t, enc_size*grid_size, f2)

        for i in range(grid_size):
            o1, o2 = output1[i], tuple(output2[i*enc_size:(i+1)*enc_size])
            if o2 in requests:
                requests[o2][o1]+=1
            else:
                requests[o2]={o1: 1, 1-o1:0}
        norm+=grid_size
        
    for key, val in requests.items():
        o=[val[0], val[1]].index(max([val[0], val[1]]))
        if key not in dec[1-o]:
            dec[o].add(key)
            score+=val[o]
        else:
            score+=val[1-o]

            
    score=100*score/norm
    return round(score, 2), dec


def get_test_score(f1, f2, enc, enc_size, dec, test):
    times, grids, configs=test
    score=0
    norm=0

        
    for t, grid_size, config1 in zip(times, grids, configs):
        config2=encode(config1, enc, enc_size)

        output1=sim_cyclic(config1, t, grid_size, f1)
        output2=sim_cyclic(config2, enc_size*t, enc_size*grid_size, f2)
        
        norm+=grid_size

        for i in range(grid_size):
            o1, o2 = output1[i], tuple(output2[i*enc_size:(i+1)*enc_size])
            if tuple(o2) in dec[o1]:
                score += 1
    score=100*score/norm
    return round(score, 2)
    
    
    
def gradient_descent_search(enc_size, f1, f2, train, test):
    enc0, enc1 = np.random.randint(0, 2, enc_size, dtype=np.uint8), np.random.randint(0, 2, enc_size, dtype=np.uint8)
    while list(enc0)==list(enc1):
        enc0, enc1 = np.random.randint(0, 2, cell_size, dtype=np.uint8), np.random.randint(0, 2, cell_size, dtype=np.uint8)
    enc=np.concatenate((enc0, enc1))
    score, dec=get_train_score_and_dec(f1, f2, [enc0, enc1], enc_size, train)
    
    while True:
        if score==100:
            score_test=get_test_score(f1, f2, [enc0, enc1], enc_size, dec, test)
            if score_test==100:
                return enc, dec
            else:
                raise NameError('unsuccessful, testing score not perfect')
        else:
            mutated_encs=[]
            for i in range(2*enc_size):
                enc_i=enc.copy()
                enc_i[i]=1-enc[i]
                enc_i0, enc_i1 = enc_i[:enc_size], enc[enc_size:]
                score_i, dec_i=get_train_score_and_dec(f1, f2, [enc_i0, enc_i1], enc_size, train)
                mutated_encs.append((enc_i, dec_i, score_i))
                
            mutated_encs=sorted(mutated_encs, key=lambda x: -x[2])
            enc_max, dec_max, score_max=mutated_encs[0]
            if score_max>score:
                enc=enc_max
                dec=dec_max
                score=score_max
            else:
                raise NameError('unsuccessful, local minimum reached, perfect score not obtained')
                


In [22]:
TIME_BOUND=10

@timeout(TIME_BOUND)
def gradient_descent_main(ca1, ca2, train, test, enc_size):
    f1, f2 = create_f_dict(ca1), create_f_dict(ca2)
    while True:
        try:
            enc, dec=gradient_descent_search(enc_size, f1, f2, train, test)
            print("enc0: ", enc[:enc_size], "enc1: ", enc[enc_size:])
            print("dec0: ", dec[0], "dec1: ", dec[1])
            return
        except NameError:
            pass
    

In [6]:
time_step_range_train=[4, 5, 6, 7, 20, 21, 40, 41, 100]
grid_size_range_train=[10, 11, 12, 13, 41, 42, 21, 20, 100]

time_step_range_test=[1, 20, 301]
grid_size_range_test=[10, 20, 101]

train_configs=[np.random.randint(0, 2, g, dtype=np.uint8) for g in grid_size_range_train]    
test_configs=[np.random.randint(0, 2, g, dtype=np.uint8) for g in grid_size_range_test]  

train=(time_step_range_train, grid_size_range_train, train_configs)
test=(time_step_range_test, grid_size_range_test, test_configs)

In [25]:
ca1=43
ca2=14
enc_size=3

try:
    gradient_descent_main(ca1, ca2, train, test, enc_size)
except:
    print(f"Could not find the relation {ca1} <= {ca2} with encoder size {enc_size} in {TIME_BOUND} seconds")

enc0:  [0 1 1] enc1:  [1 0 0]
dec0:  {(0, 1, 1), (0, 0, 1)} dec1:  {(1, 0, 0), (1, 1, 0), (0, 0, 0)}
