# Purpose: Develop algorithm to find redundancy and synergy by characterizing group invariant enputs

In [1]:
import pandas as pd
import numpy as np
from cana.boolean_node import BooleanNode
import matplotlib.pyplot as plt
%matplotlib inline

## A Test Logic Gate

In [2]:
# 000 001 010 011 100 101 110 111 ECA rule 151
#  1   1   1   0   1   0   0   1
# make cana BooleanNode for calulations later
outputs = ['1', '1', '1', '0', '1', '0', '0', '1']
gate = BooleanNode.from_output_list(outputs)
from wand.image import Image
Image(filename='../plots/example_luts_schemata/high_synergy/parity_and_off_lut.pdf')

# Here is another test gate that doesn't nicely reduce into a three-way permutation
# rule 190
outputs = ['0', '1', '1', '1', '1', '1', '0', '1'] 
gate = BooleanNode.from_output_list(outputs)

Something I said in my little writeup is that the schemata $f''_3$ shows a redundancy between synergies of all possible pairs. The algorithm I am developing here should be able to find that.

We need to go through each subset in the powerset and see whether or not the inputs in the subset make up a group invarient enput in the two symbol schemata. I think we want to be very inclusive here and count subsets with cardinality 1 as "group invarient enputs" although I'm not there yet. We also want to count all inputs sharing the same literal as in the 000 -> 1 or 111 -> 1 transitions shown in the look up table above 

In [3]:
# calculate the two symbol schemata for the logic gate
gate.input_symmetry()
ts0s, ts1s = gate._two_symbols
pi0s, pi1s = gate._prime_implicants
print(ts0s)
print(ts1s)

[('000', [], [[0, 1, 2]]), ('110', [], [[0, 1]])]
[('012', [[0, 1]], []), ('221', [], [[0, 1]])]


The organization here is a little weird and we're going to need to clean it up a bit. The entries here are in the following format:

    (reduced input entry (2 means #), list of permutable groups with different input states, list of permutable groups who share input states)

We don't care about the difference between permutable groups with the same symbols or different symbols. They are all symbols. We do want to preserve information about the transition, that is crucial to localizing the information.

We will populate a new list of permutable inputs using only the supersets of the permutable inputs.

In [4]:
ts_transitions = []
# iterate over the schemata and the output labels so we can easuly preserve that information
for output, ts in zip(['0', '1'], [ts0s, ts1s]):
    for inputs, permutables, same_symbols in ts:
        # big groups will be the ones we keep, all groups is all possibilities that cana finds
        big_groups = []
        all_groups = permutables + same_symbols
        all_groups = [set(group) for group in all_groups]
        # we need to eliminate any group that is a subset of any other
        # this is a probably pretty bad way to do that
        if len(all_groups) <= 1:
            big_groups = all_groups
        else:
            for group in all_groups:
                for check_group in all_groups:
                    if group > check_group and group not in big_groups:
                        big_groups.append(group)
        # make a new entry for the table thing
        new_entry = (output, inputs, big_groups)
        ts_transitions.append(new_entry)
ts_transitions

[('0', '000', [{0, 1, 2}]),
 ('0', '110', [{0, 1}]),
 ('1', '012', [{0, 1}]),
 ('1', '221', [{0, 1}])]

Something else we're going to need is a function to get the powerset. Heres one from itertools documentation

In [5]:
# modified from itertools documentation
from itertools import chain, combinations
def powerset(iterable):
    "powerset([1,2,3]) --> (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
    s = list(iterable)
    return chain.from_iterable(combinations(s, r) for r in range(1, len(s)+1))
list(powerset(range(3)))

[(0,), (1,), (2,), (0, 1), (0, 2), (1, 2), (0, 1, 2)]

ts_coverage doesn't seem to work but we can unpack the coverage from the two symbol schemata and calculate their coverage directly.

In [9]:
from itertools import permutations
def expand_wildcards(state_list):
    """ recursively replaces wildcards in a set of input states """
    expanded = set([])
    for i, states in enumerate(state_list):
        states = tuple(states)
        if '2' not in states:
            expanded.add(states)
        else:
            for j, char in enumerate(states):
                if char == '2':
                    gp_ones = list(states)
                    gp_ones[j] = '1'

                    gp_zeros = list(states)
                    gp_zeros[j] = '0'

                    expanded.add(tuple(gp_zeros))
                    expanded.add(tuple(gp_ones))
                    
                    expanded = expand_wildcards(expanded)
    return expanded
            

# we will add a "column" for coverage on each of the transitions
ts_coverage = []
for transition, inputs, permutables in ts_transitions:
    input_array = np.array([char for char in inputs])
    # entries can have multiple group-invariant enputs
    for subset in permutables:
        # need numpy arrays for indexing
        subset = np.array(list(subset))
        gi = input_array[subset]
        
        # we want only permutations that are meaningfully different
        gi_perm = np.unique(np.array(list(permutations(gi))), axis=0)
        expansions = list(gi_perm)

        # need to expand wildcards into zeros and ones
        expansions = expand_wildcards(expansions)

    # all that we really care about is how much of the LUT is covered by the schemata
    lut_coverage = len(expansions)
    ts_coverage.append((transition, inputs, permutables, lut_coverage))
ts_coverage

[('0', '000', [{0, 1, 2}], 1),
 ('0', '110', [{0, 1}], 1),
 ('1', '012', [{0, 1}], 2),
 ('1', '221', [{0, 1}], 4)]

We can decompose the entries with wildcards into different permutable groups. In this case the entry '002' can be thought of as all permutable pairs and a wildcard. We need to preserve the literals but we can think of the wildcard as just extra shit (redundancy even)

In [40]:
def literal_distribution(coverage_list):
    """ Function that assigns need-to-know literals to subsets of inputs pretty sure its really poorly written """
    # set up a dictionary object that will store these literals 
    # we get the keys by using the length of the input string
    # if for some godforsaken reason these are different lengths
    # you have bigger problems
    distributed = [{str(k): [] for k in powerset(range(len(coverage_list[0][1])))} for _ in range(2)]
    for transition, inputs, pos_free, coverage in coverage_list:
        # we have a choice to make here: if you have a permutable group of literals
        # we have been calling that synergy. If the remaining inputs are also
        # literal than does that mean its all synergy between them even though
        # they are not all permutable? There is no reason to think that it is
        # going to be redundancy because neither one are sufficient to determine
        # the transision on their own. I think we should call it synergy.
        input_set = set(range(len(inputs)))
        if '2' in inputs:
            # we need to figure out which inputs are NOT part of any permutable groups
            for group in pos_free:
                twos_idx = []
                outgroup_vals = []
                # find twos and figure out what is in the outgroup
                outgroup = input_set - group
                for i, char in enumerate(inputs):
                    if i in group and char =='2':
                        twos_idx.append(i)
                    elif i in outgroup:
                        outgroup_vals.append(char)
                # collect all of the literals, if there are some in the outgroup we need to
                # deal with them
                outgroup_literals = []
                if '0' in outgroup_vals or '1' in outgroup_vals:
                    for o in outgroup:
                        if inputs[o] != '2':
                            outgroup_literals.append(o)
                    
                # distribute the literals in the dictionary
                subgroup_size = len(group) - len(twos_idx)
                if subgroup_size > 0:
                    subgroups = combinations(group, len(group) - len(twos_idx))
                    for sub in subgroups:
                        if len(outgroup_literals) > 0:
                            expanded_sub = tuple(list(sub).extend(outgroup_literals))
                        else:
                            expanded_sub = sub
                        distributed[int(transition)][str(sub)].append((output, inputs, coverage))
                elif len(outgroup_literals) > 0:
                    sub = tuple(outgroup_literals)
                    distributed[int(transition)][str(sub)].append((output, inputs, coverage))
        # no wildcards
        else:
            sub = tuple(range(len(inputs)))
            distributed[int(transition)][str(sub)].append((output, inputs, coverage))

    
    return distributed
dist = literal_distribution(ts_coverage)
dist

[{'(0,)': [],
  '(1,)': [],
  '(2,)': [],
  '(0, 1)': [],
  '(0, 2)': [],
  '(1, 2)': [],
  '(0, 1, 2)': [('1', '000', 1), ('1', '110', 1)]},
 {'(0,)': [],
  '(1,)': [],
  '(2,)': [('1', '221', 4)],
  '(0, 1)': [('1', '012', 2)],
  '(0, 2)': [],
  '(1, 2)': [],
  '(0, 1, 2)': []}]

We can relate these literals to the partial information lattice, or at least the sets contained therein.

In [50]:
# this gets the sets as described by Williams and Beer
def PID_sets(k):
    """ This function returns a list of the sets in the redundancy lattice """
    double_powerset = list(chain.from_iterable([combinations(powerset(range(k)), r) for r in range(1, gate.k)]))
    keep_sets = []
    for subset in double_powerset:
        contains_subset = False
        for i in subset:
            for j in subset:
                if i != j and set(i).issubset(j):
                    contains_subset = True
                    break
            if contains_subset:
                break
        if not contains_subset:     
            keep_sets.append(subset)
    return keep_sets

We need to find transitions that are shared by more than one GIE. These indicate redundancy between synergies. and will populate our information terms

In [81]:
from dit.shannon.shannon import entropy
from dit import Distribution 
# info_sets = {}
# decomposition = {}
# for atom in info_sets:
#     # if there are distributed literals we can calculate this thing
#     if len(info_sets[atom]) > 0:
#         output_dist = Distribution(gate.outputs, [1/2**gate.k]*2**gate.k)
#         coverage = sum([t[-1] for t in info_sets[atom]])
#         norm_coverage = coverage / 2**gate.k * entropy(output_dist)
#     else:
#         norm_coverage = 0
    
#     decomposition[atom] = norm_coverage

# decomposition

for subset in keep_sets:
    info_sets[str(subset)] = []

def assign_information(distributed, gate):
    """ this function takes the distributed literals group and assigns them to the PID sets """
    # set up dictionary to store these values
    keep_sets = PID_sets(gate.k)
    info_sets = {str(k): 0 for k in keep_sets}
    print(info_sets)

    # set up dictionary for intermediate values
    grouped_coverage = [{str(k): 0 for k in powerset(range(gate.k))} for _ in range(2)]
    print(grouped_coverage)

    # calculate entropy of the output distribution for normalization
    output_dist = Distribution(gate.outputs, [1/2**gate.k]*2**gate.k)
    output_entropy = entropy(output_dist)

    # we can now search for redundancy between these informative groups
    for t, transition in enumerate(distributed):
        redundant = []
        for group in transition:
            if len(transition[group]) > 0:
                for e, entry in enumerate(transition[group]):
                    grouped_coverage[t][group] += entry[-1]

    # with the coverage calculated we can find the groups, at least in this simple case
    for t, transition in enumerate(grouped_coverage):
        redundant = []
        coverage = []
        for group in transition:
            if transition[group] > 0:
                redundant.append(group)
                coverage.append(transition[group])
        
        # finally we can add the normalized coverage to the information dict
        info_key = '(' + ', '.join(redundant) + ')'
        normed_info = np.sum(coverage) / 2**gate.k * output_entropy
        info_sets[info_key] = normed_info



            
    return info_sets
assign_information(dist, gate)

{'((0,),)': 0, '((1,),)': 0, '((2,),)': 0, '((0, 1),)': 0, '((0, 2),)': 0, '((1, 2),)': 0, '((0, 1, 2),)': 0, '((0,), (1,))': 0, '((0,), (2,))': 0, '((0,), (1, 2))': 0, '((1,), (2,))': 0, '((1,), (0, 2))': 0, '((2,), (0, 1))': 0, '((0, 1), (0, 2))': 0, '((0, 1), (1, 2))': 0, '((0, 2), (1, 2))': 0}
[{'(0,)': 0, '(1,)': 0, '(2,)': 0, '(0, 1)': 0, '(0, 2)': 0, '(1, 2)': 0, '(0, 1, 2)': 0}, {'(0,)': 0, '(1,)': 0, '(2,)': 0, '(0, 1)': 0, '(0, 2)': 0, '(1, 2)': 0, '(0, 1, 2)': 0}]


{'((0,),)': 0,
 '((1,),)': 0,
 '((2,),)': 0,
 '((0, 1),)': 0,
 '((0, 2),)': 0,
 '((1, 2),)': 0,
 '((0, 1, 2),)': 0,
 '((0,), (1,))': 0,
 '((0,), (2,))': 0,
 '((0,), (1, 2))': 0,
 '((1,), (2,))': 0,
 '((1,), (0, 2))': 0,
 '((2,), (0, 1))': 0.6084585933443496,
 '((0, 1), (0, 2))': 0,
 '((0, 1), (1, 2))': 0,
 '((0, 2), (1, 2))': 0,
 '((0, 1, 2))': 0.2028195311147832}

In [76]:
ccs = pd.read_csv('../data/eca_decompositions/ccs_df.csv')
print('I_ccs:')
rule = ccs[ccs['rule'] == 190]
for col in range(len(ccs.columns)):
    print(ccs.columns[col], rule.iloc[0, col])

I_ccs:
Unnamed: 0 190
rule 190
((0,), (1,), (2,)) 3.597633285699193e-09
((0,), (1,)) -3.5976320642641233e-09
((0,), (2,)) -3.597633285699193e-09
((0,), (1, 2)) 3.5976320642641233e-09
((0,),) 0.0
((1,), (2,)) -3.597633285699193e-09
((1,), (0, 2)) 3.5976320642641233e-09
((2,), (0, 1)) 0.10375937841734424
((0, 1), (0, 2), (1, 2)) -3.59763206725372e-09
((0, 1), (0, 2)) 0.0
((1,),) 0.0
((0, 1), (1, 2)) 0.0
((0, 1),) 0.2075187496394219
((2,),) 0.2075187496394219
((0, 2), (1, 2)) -5.0387030325893534e-08
((0, 2),) 5.0387030325893534e-08
((1, 2),) 5.0387030325893534e-08
((0, 1, 2),) 0.29248119997354793


In [77]:
imin = pd.read_csv('../data/eca_decompositions/imin_df.csv')
print('I_min:')
rule = imin[imin['rule'] == 190]
for col in range(len(imin.columns)):
    print(imin.columns[col], rule.iloc[0, col])

I_min:
Unnamed: 0 190
rule 190
((0,), (1,), (2,)) 0.0
((0,), (1,)) 0.0
((0,), (2,)) 0.0
((0,), (1, 2)) 0.0
((0,),) 0.0
((1,), (2,)) 0.0
((1,), (0, 2)) 0.0
((2,), (0, 1)) 0.3112781244591328
((0, 1), (0, 2), (1, 2)) 0.0
((0, 1), (0, 2)) 0.0
((1,),) 0.0
((0, 1), (1, 2)) 0.0
((0, 1),) 0.0
((2,),) 0.0
((0, 2), (1, 2)) 0.0
((0, 2),) 0.0
((1, 2),) 0.0
((0, 1, 2),) 0.5


In [78]:
ipm = pd.read_csv('../data/eca_decompositions/pm_df.csv')
print('I_pm:')
rule = ipm[ipm['rule'] == 190]
for col in range(len(ipm.columns)):
    print(ipm.columns[col], rule.iloc[0, col])

I_pm:
Unnamed: 0 190
rule 190
((0,), (1,), (2,)) 0.4575187496394218
((0,), (1,)) -0.4575187496394218
((0,), (2,)) 0.0
((0,), (1, 2)) 0.0
((0,),) 0.0
((1,), (2,)) 0.0
((1,), (0, 2)) 0.0
((2,), (0, 1)) -0.14624062518028913
((0, 1), (0, 2), (1, 2)) 0.7075187496394221
((0, 1), (0, 2)) 0.0
((1,),) 0.0
((0, 1), (1, 2)) 0.0
((0, 1),) -0.25
((2,),) 0.0
((0, 2), (1, 2)) -0.25
((0, 2),) 0.0
((1, 2),) 0.0
((0, 1, 2),) 0.75


In [79]:
iwedge = pd.read_csv('../data/eca_decompositions/wedge_df.csv')
print('I_wedge:')
rule = iwedge[iwedge['rule'] == 151]
for col in range(len(iwedge.columns)):
    print(iwedge.columns[col], rule.iloc[0, col])

I_wedge:
Unnamed: 0 151
rule 151
((0,), (1,), (2,)) 0.0
((0,), (1,)) 0.0
((0,), (2,)) 0.0
((0,), (1, 2)) 0.0
((0,),) 0.048794940695398685
((1,), (2,)) 0.0
((1,), (0, 2)) 0.0
((2,), (0, 1)) 0.0
((0, 1), (0, 2), (1, 2)) 0.0
((0, 1), (0, 2)) 0.0
((1,),) 0.048794940695398685
((0, 1), (1, 2)) 0.0
((0, 1),) 0.10684412153416778
((2,),) 0.048794940695398685
((0, 2), (1, 2)) 0.0
((0, 2),) 0.10684412153416778
((1, 2),) 0.10684412153416778
((0, 1, 2),) 0.4875168162362658


In [80]:
gr = {0, 1}
set(range(3)) - gr

{2}

In [69]:
expand_wildcards([np.array(['1', '0', '2']), np.array(['2', '1']), np.array(['0', '0'])])

{('0', '0'), ('0', '1'), ('1', '0', '0'), ('1', '0', '1'), ('1', '1')}

AttributeError: 'list' object has no attribute 'flatten'