# 🧠 HW 3: Interleaving

In [None]:
import sys
import otter
# try:
#   import otter
# except ImportError:
#     %pip install otter-grader
#     import otter

grader = otter.Notebook("HW3.ipynb")

## Question 1: Interleaving Program Graphs

### 🔍 Objective

In this assignment, you will extend the `ProgramGraph` class to support **interleaving** two concurrent program graphs. Interleaving allows you to model concurrent execution, where each program graph represents an independent thread or component.

---

### 🔧 Conditions in ProgramGraph

We have extended the `ProgramGraph` class constructor by adding a new parameter:

```python
Cond: Set[Condition]
```

This represents the set of all conditions that may appear in the graph’s transitions or guards.

---


### 🔧 Task: Implement `interleave(pg: ProgramGraph) -> ProgramGraph`

Add a method `interleave(pg: ProgramGraph)` that returns a **new program graph** representing the interleaving of `self` with another program graph `pg`.

---

### 🤖 Interleaving Semantics

- **Locations (`Loc`)**: Cartesian product of both graphs' locations
  `(loc1, loc2)`

- **Initial Locations (`Loc0`)**: Cartesian product of initial locations
  `(loc1, loc2) ∈ pg1.Loc0 × pg2.Loc0`

- **Actions (`Act`)**: Union of actions from both graphs

- **Conditions (`Cond`)**: Union of conditions from both graphs

- **Transitions**: For each transition in either graph:
  - If `(l1, cond, act, l1') ∈ pg1`, then
    `((l1, l2), cond, act, (l1', l2)) ∈ interleaved_pg`
  - If `(l2, cond, act, l2') ∈ pg2`, then
    `((l1, l2), cond, act, (l1, l2')) ∈ interleaved_pg`

  ✅ This means only one component moves at a time (classic interleaving)

- **Effect Function**: Use a shared environment and apply the effect of the corresponding program

- **Eval Function**: Use the original `eval_fn` from either program graph

---

### ⚠️ Note on Interleaving Multiple Systems

When interleaving more than two `TransitionSystem` or `ProgramGraph` instances, the resulting states or locations **must be represented as flat tuples**, not nested ones.

✅ For example, interleaving 3 systems should produce states like:

```python
("s0", "t1", "u2")
```

❌ And not nested tuples like:

```python
(("s0", "t1"), "u2")
```

This flattening ensures consistency in state representation, simplifies analysis and labeling, and avoids errors in reachability, labeling, or visualization.


### 🧪 Example

If `pg1` has:
```python
Loc = {'A', 'B'}
Transitions = {('A', 'x > 0', 'x -= 1', 'B')}
```

And `pg2` has:
```python
Loc = {'L0', 'L1'}
Transitions = {('L0', 'y == 0', 'y += 1', 'L1')}
```

Then the interleaved program graph has locations:
```
{('A', 'L0'), ('B', 'L0'), ('A', 'L1'), ('B', 'L1')}
```

And transitions like:
```
(('A', 'L0'), 'x > 0', 'x -= 1', ('B', 'L0'))
(('B', 'L0'), 'y == 0', 'y += 1', ('B', 'L1'))
(('A', 'L0'), 'y == 0', 'y += 1', ('A', 'L1'))
(('A', 'L1'), 'x > 0', 'x -= 1', ('A', 'L1'))
```

---

In [None]:
from typing import Callable, Set, Tuple, Dict, List, Union
from collections import deque
import copy

# Add your imports here
...

Location = Union[str, Tuple]  # A state can be a string or a tuple (location, environment)
Action = str  # Actions are represented as strings
Condition = str # Conditions are represented as strings
Transition = Tuple[Location, Condition, Action, Location]  # (source_state, action, target_state)
Environment = Dict[str, Union[str, bool, int, float]]      # Variable assignments

class ProgramGraph:
    def __init__(
        self,
        locations: Set[Location],
        initial_locations: Set[Location],
        actions: Set[Action],
        conditions: Set[Condition],
        transitions: Set[Transition],
        eval_fn: Callable[[Condition, Environment], bool],
        effect_fn: Callable[[Action, Environment], Environment],
        g0: Condition
    ):
        """
        A representation of a Program Graph.

        :param locations: Set of all program locations (Loc).
        :param initial_locations: Set of initial locations (Loc0).
        :param actions: Set of possible actions (Act).
        :param conditions: Set of possible conditions (Cond).
        :param transitions: Set of transitions in the form (loc_from, condition, action, loc_to).
        :param eval_fn: Function to evaluate a condition string in an environment.
        :param effect_fn: Function to compute the new environment after applying an action.
        :param g0: Initial condition string for filtering valid starting environments.
        """
        self.Loc = set(locations)
        self.Loc0 = set(initial_locations)
        self.Act = set(actions)
        self.Cond = set(conditions)
        self.Transitions = set(transitions)
        self.eval_fn = eval_fn
        self.effect_fn = effect_fn
        self.g0 = g0

    def add_location(self, *locations: Location) -> "ProgramGraph":
        """Add one or more locations to the program graph."""
        self.Loc.update(locations)
        return self

    def add_action(self, *actions: Action) -> "ProgramGraph":
        """Add one or more actions to the program graph."""
        self.Act.update(actions)
        return self

    def add_condition(self, *conditions: Condition) -> "ProgramGraph":
        """Add one or more conditions to the program graph."""
        self.Cond.update(conditions)
        return self

    def add_transition(self, *transitions: Transition) -> "ProgramGraph":
        """
        Add one or more transitions to the program graph.

        Each transition must be a tuple: (loc_from, condition, action, loc_to).
        """
        for transition in transitions:
            if not isinstance(transition, tuple) or len(transition) != 4:
                raise ValueError(f"Invalid transition format: {transition}. Expected (loc_from, cond, action, loc_to).")
            loc_from, cond, action, loc_to = transition
            if loc_from not in self.Loc:
                raise ValueError(f"Location {loc_from} is not in the program graph.")
            if loc_to not in self.Loc:
                raise ValueError(f"Location {loc_to} is not in the program graph.")
            if action not in self.Act:
                raise ValueError(f"Action {action} is not in the program graph.")
            if cond not in self.Cond:
                raise ValueError(f"Condition {cond} is not in the program graph.")
            self.Transitions.add(transition)
        return self

    def add_initial_location(self, *locations: Location) -> "ProgramGraph":
        """Add one or more initial locations to the program graph."""
        for loc in locations:
            if loc not in self.Loc:
                raise ValueError(f"Cannot set initial location {loc}. Location is not in the set of locations.")
            self.Loc0.add(loc)
        return self

    def set_eval_fn(self, eval_fn: Callable[[Condition, Environment], bool]) -> "ProgramGraph":
        """Set the function used to evaluate conditions."""
        self.eval_fn = eval_fn
        return self

    def set_effect_fn(self, effect_fn: Callable[[Action, Environment], Environment]) -> "ProgramGraph":
        """Set the function used to apply actions to environments."""
        self.effect_fn = effect_fn
        return self

    def eval(self, condition: Condition, env: Environment) -> bool:
        """Evaluate a condition string in the given environment."""
        return self.eval_fn(condition, env)

    def effect(self, action: Action, env: Environment) -> Environment:
        """Apply an action to the environment and return the new environment."""
        return self.effect_fn(action, env)

    def valid_transitions(self, loc: Location, env: Environment, action: Action) -> List[Tuple[Location, Action, Location]]:
        """
        Return a list of valid transitions from a given location using the provided environment and action.
        """
        # Paste your solution from previous exercise here
        ...

    def to_transition_system(self, vars: Dict[str, Set[Union[str, bool, int, float]]], labels: Set[Condition]) -> TransitionSystem:
        """
        Construct and return a Transition System from the program graph.

        :param vars: A dictionary mapping variable names to their finite sets of possible values.
        :param labels: A set of atomic proposition strings to be used for labeling states.
        :return: A TransitionSystem instance corresponding to the program graph.
        """
        # Paste your solution from previous exercise here
        ...

    def interleave(self, pg: "ProgramGraph") -> "ProgramGraph":
        ...

    def __repr__(self):
                return (
            f"ProgramGraph(\n"
            f"  Loc: {self.Loc}\n"
            f"  Act: {self.Act}\n"
            f"  Cond: {self.Cond}\n"
            f"  Transitions: {self.Transitions}\n"
            f"  Loc0: {self.Loc0}\n"
            f"  g0: {self.g0}\n"
            f")"
        )


    def plot(self):
        """
        Visualize the program graph as a directed graph using networkx and matplotlib.

        Nodes are locations. Edges are labeled with (condition, action).
        """
        G = nx.MultiDiGraph()

        # Add nodes
        for loc in self.Loc:
            G.add_node(loc, color='lightblue' if loc in self.Loc0 else 'white')

        # Add edges with (condition, action) labels
        for (loc_from, cond, action, loc_to) in self.Transitions:
            label = f"{cond} / {action}"
            G.add_edge(loc_from, loc_to, label=label)

        pos = nx.spring_layout(G, seed=42)  # consistent layout

        # Draw nodes with color
        node_colors = [G.nodes[n].get('color', 'white') for n in G.nodes]
        nx.draw_networkx_nodes(G, pos, node_color=node_colors, edgecolors='black')
        nx.draw_networkx_labels(G, pos)

        # Draw edges
        nx.draw_networkx_edges(G, pos, arrows=True)

        # Draw edge labels
        edge_labels = {(u, v): d['label'] for u, v, d in G.edges(data=True)}
        nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_size=8)

        # Display
        plt.title("Program Graph")
        plt.axis('off')
        plt.tight_layout()
        plt.show()

In [None]:
grader.check("q1")

## 🔄 Question 2: Interleaving Transition Systems with Handshaking

### 🔍 Objective

You will now extend the `TransitionSystem` class to support **interleaving** two transition systems, optionally synchronizing on a set of **handshaking actions**.

This is useful for modeling **parallel systems** that may:
- Evolve independently (interleaving)
- Or synchronize on specific shared actions (handshake)

---

### 🧩 Task

Add a method to the `TransitionSystem` class:

```python
def interleave(self, ts: TransitionSystem, h: Set[Action]) -> TransitionSystem:
    ...
```

Where:
- `self` is the first transition system (`TS1`)
- `ts` is the second transition system (`TS2`)
- `h` is a set of **handshaking actions** ⊆ `Act1 ∩ Act2`

---

### 🤖 Interleaving Semantics

Let `s1`, `s1'` be states in `TS1`, and `s2`, `s2'` be states in `TS2`.

You should add the following transitions to the interleaved system:

#### 🔹 Independent Transitions

If `a ∈ Act1 \ h`:
```
((s1, s2), a, (s1', s2)) if (s1, a, s1') ∈ TS1
```

If `a ∈ Act2 \ h`:
```
((s1, s2), a, (s1, s2')) if (s2, a, s2') ∈ TS2
```

#### 🔸 Handshaking Transitions

If `a ∈ h`:
```
((s1, s2), a, (s1', s2')) if (s1, a, s1') ∈ TS1 and (s2, a, s2') ∈ TS2
```

---

### 🧱 Transition System Components

Your interleaved TS should include:

- `S = S1 × S2`
- `Act = Act1 ∪ Act2`
- `Transitions =` as defined above
- `I = I1 × I2`
- `AP = AP1 ∪ AP2`
- `L((s1, s2)) = L1(s1) ∪ L2(s2)`

---

### 🧪 Example

Suppose:

TS1:
```
S = {s0, s1}, Act = {a}, Transitions = {(s0, a, s1)}
```

TS2:
```
S = {t0, t1}, Act = {a}, Transitions = {(t0, a, t1)}
```

With `h = {'a'}`, the interleaved TS will have:
```
((s0, t0), a, (s1, t1))
```

If `h = ∅`, you'll get:
```
((s0, t0), a, (s1, t0))
((s0, t0), a, (s0, t1))
((s1, t0), a, (s1, t1))
((s0, t1), a, (s1, t1))
```

---

In [None]:
import itertools as it
from typing import Set, Dict, Tuple, Union, Optional

import networkx as nx
import matplotlib.pyplot as plt

from staff_solution.utils import flatten

State = Union[str, Tuple]  # A state can be a string or a tuple (location, environment)
Action = str  # Actions are represented as strings
Transition = Tuple[State, Action, State]  # (source_state, action, target_state)
LabelingMap = Dict[State, Set[str]]  # Maps states to atomic propositions


class TransitionSystem:
    """
    A Transition System (TS) representation.

    Attributes:
        S (Set[State]): The set of all states (strings or tuples).
        Act (Set[Action]): The set of all possible actions.
        Transitions (Set[Transition]): The set of transitions, each represented as (state_origin, action, state_target).
        I (Set[State]): The set of initial states.
        AP (Set[str]): The set of atomic propositions.
        _L (LabelingMap): A dictionary mapping states to their respective atomic propositions.
    """

    def __init__(
        self,
        states: Optional[Set[State]] = None,
        actions: Optional[Set[Action]] = None,
        transitions: Optional[Set[Transition]] = None,
        initial_states: Optional[Set[State]] = None,
        atomic_props: Optional[Set[str]] = None,
        labeling_map: Optional[LabelingMap] = None,
    ) -> None:
        """
        Initializes the Transition System.

        :param states: A set of states (each a string or a tuple). Defaults to an empty set.
        :param actions: A set of actions. Defaults to an empty set.
        :param transitions: A set of transitions, each as (state_origin, action, state_target). Defaults to an empty set.
        :param initial_states: A set of initial states. Defaults to an empty set.
        :param atomic_props: A set of atomic propositions. Defaults to an empty set.
        :param labeling_map: A dictionary mapping states to sets of atomic propositions. Defaults to an empty dictionary.
        """
        self.S: Set[State] = set(states) if states is not None else set()
        self.Act: Set[Action] = set(actions) if actions is not None else set()
        self.Transitions: Set[Transition] = set(transitions) if transitions is not None else set()
        self.I: Set[State] = set(initial_states) if initial_states is not None else set()
        self.AP: Set[str] = set(atomic_props) if atomic_props is not None else set()
        self._L: LabelingMap = dict(labeling_map) if labeling_map is not None else {}

    def add_state(self, *states: State) -> "TransitionSystem":
        """
        Adds one or more states to the transition system.

        :param states: One or more states (strings or tuples) to be added.
        :return: The TransitionSystem instance (for method chaining).
        """
        self.S.update(states)
        return self

    def add_action(self, *actions: Action) -> "TransitionSystem":
        """
        Adds one or more actions to the transition system.

        :param actions: One or more actions (strings) to be added.
        :return: The TransitionSystem instance (for method chaining).
        """
        self.Act.update(actions)
        return self

    def add_transition(self, *transitions: Transition) -> "TransitionSystem":
        """
        Adds one or more transitions to the transition system.
        Ensures that all involved states and actions exist before adding the transitions.

        Each transition must be provided as a tuple of the form `(state_from, action, state_to)`, where:
        - `state_from` is the source state.
        - `action` is the action performed.
        - `state_to` is the resulting state.

        :param transitions: One or more transitions, each as a tuple `(state_from, action, state_to)`.
        :raises ValueError:
            - If a transition is not a tuple of length 3.
            - If `state_from` or `state_to` does not exist in `self.S`.
            - If `action` is not in `self.Act`.
        :return: The `TransitionSystem` instance (for method chaining).
        """
        for transition in transitions:
            if not isinstance(transition, tuple) or len(transition) != 3:
                raise ValueError(f"Invalid transition format: {transition}. Expected (state_from, action, state_to).")

            state_from, action, state_to = transition

            if state_from not in self.S:
                raise ValueError(f"State {state_from} is not in the transition system.")
            if state_to not in self.S:
                raise ValueError(f"State {state_to} is not in the transition system.")
            if action not in self.Act:
                raise ValueError(f"Action {action} is not in the transition system.")

            self.Transitions.add(transition)
        return self

    def add_initial_state(self, *states: State) -> "TransitionSystem":
        """
        Adds one or more states to the set of initial states.

        :param states: One or more states to be marked as initial.
        :raises ValueError: If any state does not exist in the system.
        :return: The TransitionSystem instance (for method chaining).
        """
        for state in states:
            if state not in self.S:
                raise ValueError(f"Initial state {state} must be in the transition system.")
            self.I.add(state)
        return self

    def add_atomic_proposition(self, *props: str) -> "TransitionSystem":
        """
        Adds one or more atomic propositions to the transition system.

        :param props: One or more atomic propositions (strings) to be added.
        :return: The TransitionSystem instance (for method chaining).
        """
        self.AP.update(props)
        return self

    def add_label(self, state: State, *labels: str) -> "TransitionSystem":
        """
        Adds one or more atomic propositions to a given state.

        :param state: The state to label.
        :param labels: One or more atomic propositions to be assigned to the state.
        :raises ValueError: If the state is not in the system or if any label is not a valid atomic proposition.
        :return: The TransitionSystem instance (for method chaining).
        """
        if state not in self.S:
            raise ValueError(f"Cannot set labels for {state}. State is not in the transition system.")

        invalid_labels = {label for label in labels if label not in self.AP}
        if invalid_labels:
            raise ValueError(f"Cannot assign labels {invalid_labels}. They are not in the set of atomic propositions (AP).")

        self._L.setdefault(state, set()).update(labels)
        return self

    def L(self, state: State) -> Set[str]:
        """
        Retrieves the set of atomic propositions that hold in a given state.

        :param state: The state whose atomic propositions are being retrieved.
        :raises ValueError: If the state is not in the transition system.
        :return: A set of atomic propositions associated with the given state.
        """
        if state not in self.S:
            raise ValueError(f"State {state} is not in the transition system.")
        return self._L.get(state, set())

    def pre(self, S: Union[State, Set[State]], action: Optional[Action] = None) -> Set[State]:
        """
        Computes the set of predecessor states from which a given state or set of states can be reached.

        :param S: A single state (string/tuple) or a collection of states.
        :param action: (Optional) If provided, filters only the transitions that use this action.
        :return: A set of predecessor states.
        """
        # Paste your solution from the previous exercise here
        ...

    def post(self, S: Union[State, Set[State]], action: Optional[Action] = None) -> Set[State]:
        """
        Computes the set of successor states reachable from a given state or a collection of states.

        :param S: A single state or a collection of states.
        :param action: (Optional) Filters transitions by this action.
        :return: A set of successor states.
        """
        # Paste your solution from the previous exercise here
        ...

    def reach(self) -> Set[State]:
        """
        Computes the set of all reachable states from the initial states.

        :return: A set of reachable states.
        """
        # Paste your solution from the previous exercise here
        ...

    def is_action_deterministic(self) -> bool:
        """
        Checks whether the transition system is action-deterministic.

        A transition system is action-deterministic if:
        - It has at most one initial state.
        - For each state and action, there is at most one successor state.

        :return: True if the transition system is action-deterministic, False otherwise.
        """
        # Paste your solution from the previous exercise here
        ...

    def is_label_deterministic(self) -> bool:
        """
        Checks whether the transition system is label-deterministic.

        A transition system is label-deterministic if:
        - It has at most one initial state.
        - For each state, the number of reachable successor states is equal to the number of unique label sets
          of these successor states.

        :return: True if the transition system is label-deterministic, False otherwise.
        """
        # Paste your solution from the previous exercise here
        ...

    def interleave(self, ts, h=frozenset()):
        ...

    def __repr__(self) -> str:
        """
        Returns a string representation of the Transition System.

        :return: A formatted string representation of the TS.
        """
        return (
            f"TransitionSystem(\n"
            f"  States: {self.S}\n"
            f"  Actions: {self.Act}\n"
            f"  Transitions: {self.Transitions}\n"
            f"  Initial States: {self.I}\n"
            f"  Atomic Propositions: {self.AP}\n"
            f"  Labels: {self._L}\n"
            f")"
        )


    def plot(self, title: str = "Transition System", figsize: Tuple[int, int] = (10, 6)) -> None:
        """
        Plots the Transition System as a directed graph.

        :param title: Title of the plot.
        :param figsize: Figure size for the plot.
        """
        G = nx.DiGraph()

        # Add nodes (states)
        for state in self.S:
            label = f"{state}\n{' '.join(self.L(state))}" if self.L(state) else str(state)
            print(label)
            G.add_node(state, label=label, color="blue" if state in self.I else "yellow")

        # Add edges (transitions)
        for state_from, action, state_to in self.Transitions:
            G.add_edge(state_from, state_to, label=action)

        plt.figure(figsize=figsize)
        pos = nx.spring_layout(G)  # Positioning algorithm for layout

        # Draw nodes
        node_colors = [G.nodes[n]["color"] for n in G.nodes]
        nx.draw(G, pos, with_labels=True, labels=nx.get_node_attributes(G, "label"), node_color=node_colors, edgecolors="black", node_size=2000, font_size=10)

        # Draw edge labels (actions)
        edge_labels = {(u, v): d["label"] for u, v, d in G.edges(data=True)}
        nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_size=9)

        plt.title(title)
        plt.show()


In [None]:
grader.check("q2")

## ♟️ Question 3: Solving N-Queens with Transition Systems


### 🧩 Task 1: Build the System

Your goal in this task is to model the **N-Queens problem** using `TransitionSystem` objects and interleaving.

Each queen must be placed in a unique row and must not threaten any other queen. You will model each queen as a separate transition system component and interleave them to explore all possible board configurations.

---

#### 🔧 Function 1: `queen_ts(row: int, n: int) -> TransitionSystem`

This function builds a transition system representing a **single queen in row `row`**. The queen can choose to be placed in any of the `n` columns.

- **States**: `"start"`, and one state per column: `"q{row}{c}"`
- **Actions**: One per column: `"row->column"` (e.g., `"2->3"`)
- **Transitions**: From `"start"` to `"q{row}{c}"` via `"row->c"`
- **Atomic Propositions**: `"q{row}{c}"` for each column
- **Labeling**: `"q{row}{c}"` holds in state `"q{row}{c}"`

---

#### 🔧 Function 2: `interleave_n_queens(n: int) -> TransitionSystem`

This function should:

- Construct `n` independent TSs (one per queen/row) using `queen_ts(...)`
- Interleave them into a single transition system using `.interleave(...)`
- No handshaking is required

The resulting transition system will explore all ways to place `n` queens on an `n×n` board, one per row.

---

#### 🔧 Function 3: `extract_valid_n_queen_states(ts: TransitionSystem) -> List[Tuple[str]]`

After building the interleaved TS:

- Use `.reach()` to find all reachable states
- Filter only those states where all queens are placed (i.e., no `"start"` state appears)
- For each state, extract column indices and check for diagonal or column conflicts using `is_safe(...)`

Return all valid board configurations.

---

#### 🧠 Helper: `is_safe(positions: List[int]) -> bool`

This utility function receives a list of queen column positions and checks:

- No queens are in the same column
- No queens are on the same diagonal

Use this to validate candidate states.

---


### 💡 Example

For `n = 4`, your solution should yield exactly 2 valid solutions:

```python
[
    ('q01', 'q13', 'q20', 'q32'),
    ('q02', 'q10', 'q23', 'q31')
]
```

In [None]:
def queen_ts(row: int, n: int) -> TransitionSystem:
    """
    Build a TS for a queen in row `row`, choosing a column from 0 to n-1.
    """
    ...


def interleave_n_queens(n: int) -> TransitionSystem:
    """
    Interleave N TSs for N queens placed on an NxN board.
    Each TS represents a queen choosing one column.
    """
    ...


def is_safe(positions: List[int]) -> bool:
    """
    Check if a list of queen column positions is a valid N-Queens solution.
    """
    ...


def extract_valid_n_queen_states(ts: TransitionSystem) -> List[Tuple[str]]:
    """
    Given the interleaved TS, return all reachable states that represent valid N-Queens solutions.
    """
    ...

In [None]:
grader.check("q3")

## Submission

Make sure you have run all cells in your notebook in order before running the cell below, so that all images/graphs appear in the output. The cell below will generate a zip file for you to submit. **Please save before exporting!**

In [None]:
# Save your notebook first, then run this cell to export your submission.
grader.export(pdf=False)