## GROUND TRUTH EXPRESSION SIMULATION


In [4]:
import numpy as np
import re

# ──────────────────────────────────────────────────────────────────────────────
# 1) Generic update function built from a text specification
# ──────────────────────────────────────────────────────────────────────────────

def update_from_text(state, network_str):
    """
    state: 1D np.array of length n_vars (0/1)
    network_str: multiline text with lines like "v0 = v1 AND NOT v2"
    returns: new_state, same shape as state
    """
    # parse lines
    lines = [L.strip() for L in network_str.strip().split('\n') if L.strip()]
    var_names = []
    exprs = []
    for line in lines:
        var, expr = [part.strip() for part in line.split('=', 1)]
        var_names.append(var)
        # convert operators to Python syntax
        expr_py = (expr.replace('AND', ' and ')
                      .replace('OR', ' or ')
                      .replace('NOT', ' not '))
        exprs.append(expr_py)
    # map var name to its index in the state vector
    var2idx = {v: i for i, v in enumerate(var_names)}

    # build evaluation context
    ctx = {var: bool(state[var2idx[var]]) for var in var_names}

    # evaluate each expression
    new = np.zeros_like(state)
    for i, expr_py in enumerate(exprs):
        # eval returns a Python bool, convert to int
        new[i] = int(eval(expr_py, {}, ctx))
    return new


# ──────────────────────────────────────────────────────────────────────────────
# 2) Simulator
# ──────────────────────────────────────────────────────────────────────────────

def simulate_from_text(network_str, initial_state, steps=50):
    """
    network_str: multiline network definition
    initial_state: list or 1D array of 0/1 of length n_vars
    steps: number of synchronous update steps
    returns: history, a (steps × n_vars) np.array
    """
    state = np.array(initial_state, dtype=int)
    history = np.zeros((steps, state.size), dtype=int)
    for t in range(steps):
        history[t] = state
        state = update_from_text(state, network_str)
    return history


# ──────────────────────────────────────────────────────────────────────────────
# 3) Example usage
# ──────────────────────────────────────────────────────────────────────────────

'''Emter input network in this format:'''

if __name__ == "__main__":
    net_text = '''
    v0 = v1 AND NOT v2
    v1 = v0 OR v3
    v2 = NOT v4
    v3 = v2 OR NOT v5
    v4 = v3 AND v6
    v5 = NOT v1 OR v4
    v6 = v5 AND NOT v0
    '''
    
    # determine number of variables from network definition
    lines = [L.strip() for L in net_text.strip().split('\n') if L.strip()]
    n_vars = len(lines)

    # pick a random initial state of length n_vars
    init = np.random.randint(0, 2, size=n_vars)
    traj = simulate_from_text(net_text, init, steps=50)

    print("Trajectory shape:", traj.shape)
    print(traj[:10])  # first 10 states


Trajectory shape: (50, 7)
[[0 0 0 1 1 0 0]
 [0 1 0 1 0 1 0]
 [1 1 1 0 0 0 1]
 [0 1 1 1 0 0 0]
 [0 1 1 1 0 0 0]
 [0 1 1 1 0 0 0]
 [0 1 1 1 0 0 0]
 [0 1 1 1 0 0 0]
 [0 1 1 1 0 0 0]
 [0 1 1 1 0 0 0]]


## SCORING FUNCTIONS

In [5]:
import sys
import numpy as np
import networkx as nx
import itertools
import re

# Adjust this to point at your copy of the toolbox
sys.path.append('/Users/hiteshkandarpa/Desktop/Acads/AlgoBio/Project/Code/DesignPrinciplesGeneNetworks')
from lib import canalizing_function_toolbox_v16 as cft


def input_redundancy(f):
    n = int(np.log2(len(f)))
    if n == 0:
        return 1.0
    relevant = 0
    for i in range(n):
        for x in itertools.product([0,1], repeat=n):
            idx = sum(x[n-1-j] << j for j in range(n))
            y = list(x); y[i] ^= 1
            jdx = sum(y[n-1-j] << j for j in range(n))
            if f[idx] != f[jdx]:
                relevant += 1
                break
    return 1 - (relevant / n)


def average_sensitivity(f):
    n = int(np.log2(len(f)))
    if n == 0:
        return 0.0
    total = 0
    for x in itertools.product([0,1], repeat=n):
        idx = sum(x[n-1-i] << i for i in range(n))
        for i in range(n):
            y = list(x); y[i] ^= 1
            jdx = sum(y[n-1-k] << k for k in range(n))
            total += abs(f[idx] - f[jdx])
    return total / (n * 2**n)


class BooleanNetworkAnalyzer:
    def __init__(self, network_str):
        self.nodes = {}
        self.constants = []
        self.truth_tables = {}
        self.variables = []
        self._parse_network(network_str)
        self._build_truth_tables()
        self._build_adjacency_matrix()

    def _parse_network(self, network_str):
        lines = [L.strip() for L in network_str.strip().split('\n') if L.strip()]
        for line in lines:
            if '=' not in line: continue
            var, expr = line.split('=', 1)
            self.nodes[var.strip()] = expr.strip()

        defined = set(self.nodes.keys())
        used = set()
        for expr in self.nodes.values():
            used |= set(re.findall(r'\b[A-Za-z_][A-Za-z0-9_]*\b', expr))
        # external symbols
        self.constants = sorted(used - defined)
        self.variables = list(self.nodes.keys())

    def _evaluate_expression(self, expr, ctx):
        e = (expr.replace('AND',' and ')
                 .replace('OR',' or ')
                 .replace('NOT',' not '))
        try:
            return int(eval(e, {}, ctx))
        except Exception:
            return 0

    def _build_truth_tables(self):
        for node, expr in self.nodes.items():
            deps = sorted(set(re.findall(r'\b[A-Za-z_][A-Za-z0-9_]*\b', expr))
                          - set(self.constants))
            table = []
            for vals in itertools.product([0,1], repeat=len(deps)):
                ctx = dict(zip(deps, vals))
                for c in self.constants:
                    ctx[c] = 0
                table.append(self._evaluate_expression(expr, ctx))
            self.truth_tables[node] = table

    def _build_adjacency_matrix(self):
        n = len(self.variables)
        A = np.zeros((n, n), dtype=int)
        for i, targ in enumerate(self.variables):
            for j, src in enumerate(self.variables):
                if src in re.findall(r'\b[A-Za-z_][A-Za-z0-9_]*\b',
                                     self.nodes[targ]):
                    A[i, j] = 1
        self.adjacency_matrix = A

    def compute_scores(self):
        F = [self.truth_tables[v] for v in self.variables]
        I = [[j for j,val in enumerate(row) if val]
             for row in self.adjacency_matrix]
        return self._calculate_network_metrics(F, I)

    def _calculate_network_metrics(self, F, I):
        scores = {}
        N = len(F)
        if N < 3:
            print("Warning: too few nodes for FFL/FBL.")
            return None

        # canalization
        depths = [cft.find_layers(f)[0] for f in F]
        scores['average_canalization_depth'] = np.mean(depths)
        scores['average_redundancy']       = np.mean([input_redundancy(f) for f in F])
        scores['average_sensitivity']      = np.mean([average_sensitivity(f) for f in F])

        # graph and motifs
        A = self.adjacency_matrix
        G = nx.from_numpy_array(A, create_using=nx.DiGraph)

        # FFLs
        try:
            ffls, ffl_types = cft.get_ffls(A, F, I)
            scores['number_of_ffls']     = len(ffls)
            total_triplets = (N*(N-1)*(N-2)) / 6
            scores['normalized_ffls']    = len(ffls) / total_triplets
        except Exception as e:
            print("FFL computation failed:", e)
            scores['number_of_ffls']  = 0
            scores['normalized_ffls'] = 0

        # FBLs
        try:
            cycles = [c for c in nx.simple_cycles(G) if len(c)>1]
            scores['number_of_fbls']     = len(cycles)
            total_edges = N*(N-1)
            scores['normalized_fbls']    = len(cycles) / total_edges
        except Exception as e:
            print("FBL computation failed:", e)
            scores['number_of_fbls']  = 0
            scores['normalized_fbls'] = 0

        # criticality
        try:
            scores['criticality'] = cft.derrida_value(F, I, N, 1)
        except Exception as e:
            print("Criticality failed:", e)
            scores['criticality'] = 0

        return scores


if __name__ == '__main__':
    '''enter network here in this format:, this is some example'''

    network_input = '''
v0 = v1 AND NOT v2 
v1 = v0 OR v3 
v2 = NOT v4 
v3 = v2 OR NOT v5 
v4 = v3 AND v6 
v5 = NOT v1 OR v4 
v6 = v5 AND NOT v0
'''
    analyzer = BooleanNetworkAnalyzer(network_input)
    scores = analyzer.compute_scores()
    for keys,values in scores.items():
        print(keys, values, '\n')


average_canalization_depth 1.8571428571428572 

average_redundancy 0.0 

average_sensitivity 0.5714285714285714 

number_of_ffls 0 

normalized_ffls 0.0 

number_of_fbls 11 

normalized_fbls 0.2619047619047619 

criticality 1.058 



## DISTANCE SCORE

There are 2 scores here, Overlap and Hamming dist. (both normalised - verified by using same network for data generation and scoring-got 1.00 each score as expected)
# Overlap 
The fraction of your “experimental” states that actually show up in the simulated trajectory, in the same order they appear in the experiment. 
How well the model is able to visit all the key states you observed, without caring exactly when it hits them, only that it does so in the right sequence.

# Hamming
One minus the average fraction of bits that differ, when you line up the first 𝑇 steps of simulation with the 𝑇 experimental steps.
How similarly the network behaves at each time point, measuring bit-by-bit agreement.


Overlap tells you about coverage and order.
Hamming tells you about synchrony and bit-perfect matching.

# Why they can differ dramatically

1. High overlap, low Hamming:
Your model eventually hits all the important states (overlap → 1.0), but maybe it reaches them at different times or with detours. When you compare step-by-step, many time points will be “wrong,” so the Hamming score drops.

2. High Hamming, low overlap:
The simulation might stay very close to the experiment in the early steps (high Hamming for the first few), but then drift and never visit some later experimental states—so those missing states drive the overlap score down.

3. Both high:
Ideal case where the model both follows the experiment closely at each step and visits all experimental states in the right order.

4. Both low:
The model neither tracks the experiment step-by-step nor covers all of its key states.


    

In [6]:
import re

# 1) extract each node’s input variables from the original expressions
dependencies = {}
for var, expr in analyzer.nodes.items():
    deps = sorted(
        set(re.findall(r'\b[A-Za-z_][A-Za-z0-9_]*\b', expr))
        - set(analyzer.constants)
    )
    dependencies[var] = deps

# 2) simulate the Boolean network synchronously
def simulate_network(analyzer, initial_state, steps):
    """
    Returns a list of states (each a list of 0/1) of length `steps`,
    starting from `initial_state`.  States are ordered according to
    analyzer.variables.
    """
    variables = analyzer.variables
    truth_tables = analyzer.truth_tables
    sim = [initial_state.copy()]
    for _ in range(steps - 1):
        ctx = dict(zip(variables, sim[-1]))
        nxt = []
        for var in variables:
            bits = [ctx[d] for d in dependencies[var]]
            # build the index into the truth table
            idx = sum(bit << (len(bits) - 1 - i) for i, bit in enumerate(bits))
            nxt.append(truth_tables[var][idx])
        sim.append(nxt)
    return sim

# 3) compare an “expression” STG to a simulated STG
def compare_tgs(expression_states, simulated_states):
    """
    expression_states: list of length T of states, or a (T, n) array
    simulated_states: list of length ≥ T of states, or a (S, n) array
    Returns (overlap_score, hamming_score).
    """
    # — coerce to np.arrays —
    expr_arr = np.array(expression_states, dtype=int)     # shape (T, n)
    sim_arr  = np.array(simulated_states, dtype=int)      # shape (S, n)
    T, n_vars = expr_arr.shape
    S, _      = sim_arr.shape

    # — overlap score: fraction of expr states found in order in sim_arr —
    last_j = 0
    matched = 0
    for i in range(T):
        for j in range(last_j, S):
            if np.array_equal(sim_arr[j], expr_arr[i]):
                matched += 1
                last_j = j + 1
                break
    overlap_score = matched / T

    # — hamming score: compare first T steps (or S if shorter) —
    L = min(T, S)
    # compute Hamming distances row-wise
    dists = np.sum(np.abs(sim_arr[:L] - expr_arr[:L]), axis=1)
    avg_hd = dists.mean()
    hamming_score = 1.0 - (avg_hd / n_vars)

    return overlap_score, hamming_score


# replace these with your real data:
traj = np.array(traj)             # if traj was some array‐like
expression_states = traj.tolist() # or you can just keep it as array
initial_state = init              # ensure this is a list or 1D array of length n_vars

T = len(expression_states)
sim_STG = simulate_network(analyzer, initial_state, 5 * T)

ov, hd = compare_tgs(expression_states, sim_STG)
print(f"Overlap score: {ov:.3f}")
print(f"Hamming score: {hd:.3f}")

Overlap score: 1.000
Hamming score: 1.000
