

```
# In the working directory, you must upload your rules in a .txt file.

Example format:

A: C
B: C
C: A or B

```



Different asynchronous update scheme tools

In [3]:
import itertools
import re
import csv

def parse_rules_from_file(filename="rules.txt"):
    """
    Reads node definitions from a text file.
    Format: node: Boolean expression.
    Returns:
      - functions: dict {node: expression}
      - dependencies: dict {node: set of dependent nodes}
    """
    functions = {}
    dependencies = {}

    token_pattern = r'\b[\w]+\b'
    keywords = {"and", "or", "not", "True", "False"}

    with open(filename, "r") as file:
        for line in file:
            line = line.strip()
            if not line:
                continue
            try:
                node, expr = line.split(":", 1)
            except ValueError:
                print(f"Incorrect format in line: {line}")
                continue
            node = node.strip()
            expr = expr.strip()
            functions[node] = expr
            tokens = re.findall(token_pattern, expr)
            deps = {token for token in tokens if token not in keywords and token != node}
            dependencies[node] = deps

    return functions, dependencies

def build_interaction_graph(dependencies):
    edges = []
    for target, deps in dependencies.items():
        for source in deps:
            if source in dependencies:
                edges.append((source, target))
    return edges

def compute_signature(schedule, edges):
    """
    Given an update schedule (a dict {node: block}) and the list of edges (source, target),
    computes the signature of the update digraph.

    For each edge (j, i):
      - If schedule[j] >= schedule[i], label as "P"
      - If schedule[j] < schedule[i], label as "N"

    Returns a sorted tuple of (j, i, label)
    """
    signature = []
    for j, i in edges:
        label = "P" if schedule[j] >= schedule[i] else "N"
        signature.append((j, i, label))
    return tuple(sorted(signature))

def generate_block_schedules(nodes):
    """
    Generates all block-sequential update schedules for the nodes.
    An update schedule is a surjective function s: nodes -> {1, ..., m} (for m from 1 to len(nodes)).
    Returns a list of schedules (each is a dict {node: block}).
    """
    schedules = []
    n = len(nodes)
    for m in range(1, n + 1):
        for values in itertools.product(range(1, m + 1), repeat=n):
            if set(values) == set(range(1, m + 1)):  # ensure surjectivity
                sched = {node: value for node, value in zip(nodes, values)}
                schedules.append(sched)
    return schedules

def group_update_schedules(nodes, edges):
    """
    Generates all block-sequential update schedules and groups them by equivalence class (signature).
    Keeps the first schedule that appears in each class.
    Returns a dictionary: {signature: list of schedules}.
    """
    groups = {}
    schedules = generate_block_schedules(nodes)
    for sched in schedules:
        signature = compute_signature(sched, edges)
        if signature not in groups:
            groups[signature] = []
        groups[signature].append(sched)
    return groups

def schedule_to_list(schedule, nodes):
    """
    Converts a schedule (dict {node: block}) into a list of integers
    according to the order of the 'nodes' list.
    """
    return [schedule[node] for node in nodes]

def export_to_csv(results, filename="update_schedules.csv"):
    """
    Exports the result (list of tuples: (scheme_name, schedule_list)) to a CSV file.
    """
    with open(filename, mode="w", newline="") as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["Scheme", "Schedule"])
        for scheme_name, schedule_list in results:
            writer.writerow([scheme_name, schedule_list])
    print(f"\nResults exported to {filename}")

def main():
    functions, dependencies = parse_rules_from_file("rules.txt")
    nodes = list(functions.keys())

    print("\nDefined nodes:", nodes)

    edges = build_interaction_graph(dependencies)
    print("Edges (dependencies):")
    for j, i in edges:
        print(f"  {j} -> {i}")

    groups = group_update_schedules(nodes, edges)
    total_schedules = len(generate_block_schedules(nodes))
    print(f"\nTotal number of update schedules (block-sequential): {total_schedules}")
    print(f"Number of update classes (based on update digraph): {len(groups)}\n")

    print("Representative schedule of each class:\n")
    results = []
    for idx, (signature, schedules) in enumerate(groups.items(), start=1):
        representative = schedules[0]
        order_list = schedule_to_list(representative, nodes)
        scheme_name = f"Scheme_{idx}"
        print(f"{scheme_name}; {order_list}")
        results.append((scheme_name, order_list))

    export_to_csv(results)

if __name__ == "__main__":
    main()

# For 9 nodes it takes approximately 16 minutes


Defined nodes: ['A', 'B', 'C']
Edges (dependencies):
  C -> A
  C -> B
  A -> C
  B -> C

Total number of update schedules (block-sequential): 13
Number of update classes (based on update digraph): 9

Representative schedule of each class:

Scheme_1; [1, 1, 1]
Scheme_2; [1, 1, 2]
Scheme_3; [1, 2, 1]
Scheme_4; [1, 2, 2]
Scheme_5; [2, 1, 1]
Scheme_6; [2, 1, 2]
Scheme_7; [2, 2, 1]
Scheme_8; [1, 3, 2]
Scheme_9; [3, 1, 2]

Results exported to update_schedules.csv


Obtaining basins of attraction for each asynchronous update scheme

In [4]:
import numpy as np
import pandas as pd
import re
import ast

def load_rules_from_file(filename="rules.txt"):
    """
    Loads Boolean rules from a text file.
    Returns a dictionary {node: expression}
    """
    rules = {}
    with open(filename, "r") as file:
        for line in file:
            line = line.strip()
            if not line:
                continue
            try:
                node, expr = line.split(":", 1)
                rules[node.strip()] = expr.strip()
            except ValueError:
                print(f"Incorrect format in line: {line}")
    return rules

def simulate_network(rules, update_vector, initial_state):
    state_transition_table = []
    for i in range(2 ** len(initial_state)):
        state_str = format(i, f"0{len(initial_state)}b")
        current_state = {node: int(state_str[idx]) for idx, node in enumerate(initial_state)}
        steps = set(update_vector)
        for step in steps:
            next_state = current_state.copy()
            nodes_to_update = [node for idx, node in enumerate(rules) if update_vector[idx] == step]
            for node in nodes_to_update:
                local_env = {key: bool(val) for key, val in current_state.items()}
                next_state[node] = eval(rules[node], {}, local_env)
            current_state = next_state.copy()
        result_state_str = ''.join(str(int(current_state[node])) for node in sorted(current_state))
        state_transition_table.append(f"{state_str} => {result_state_str}")
    return state_transition_table

def normalize_cycle(cycle):
    min_index = min(range(len(cycle)), key=lambda i: cycle[i])
    return cycle[min_index:] + cycle[:min_index]

def find_attractors(transition_table):
    state_map = {state.split(' => ')[0]: state.split(' => ')[1] for state in transition_table}
    attractors = {}
    basin_map = {}
    for state in state_map.keys():
        visited = []
        current_state = state
        trajectory = []
        while current_state not in visited:
            visited.append(current_state)
            trajectory.append(current_state)
            current_state = state_map[current_state]
        if current_state in visited:
            cycle_start = visited.index(current_state)
            cycle = tuple(visited[cycle_start:])
            normalized_cycle = normalize_cycle(cycle)
            if normalized_cycle not in attractors:
                attractors[normalized_cycle] = set()
            for s in trajectory:
                basin_map[s] = normalized_cycle
            attractors[normalized_cycle].update(visited)
    return attractors, basin_map

# Load rules from file
rules = load_rules_from_file("rules.txt")

# Initialize all nodes to 0
initial_state = {node: 0 for node in sorted(rules.keys())}

# Load update vectors with comma separator
file_path = './update_schedules.csv'
update_vectors_df = pd.read_csv(file_path, sep=',', header=0)

# Confirm column names
print("📋 CSV column names:", update_vectors_df.columns.tolist())

schedule_column = "Schedule"

results = []
for index, row in update_vectors_df.iterrows():
    try:
        update_vector = ast.literal_eval(row[schedule_column])
        if len(update_vector) != len(rules):
            print(f"⚠️ Skipping row {index}: expected {len(rules)} elements, got {len(update_vector)}")
            continue

        transition_table = simulate_network(rules, update_vector, initial_state)
        attractors, basin_map = find_attractors(transition_table)

        for attractor, basin in sorted(attractors.items(), key=lambda item: len(item[1]), reverse=True):
            results.append({
                "Update Vector": str(update_vector),
                "Attractor": attractor,
                "Basin Size": len(basin)
            })
    except Exception as e:
        print(f"❌ Error processing row {index}: {e}")

# Save results to CSV
results_df = pd.DataFrame(results)
results_df.to_csv('./Attractor_Results.csv', index=False, sep=';')
print("✅ Results saved to './Attractor_Results.csv'")

# For 11000 Scheme it takes approximately 11 minutes

📋 CSV column names: ['Scheme', 'Schedule']
✅ Results saved to './Attractor_Results.csv'
