# **Learning Moore machines from Input-Output traces: A Proof of Concept (PoC) project**

> This project critically examines the **MooreMI** algorithm, as introduced by Giantamidis et al. (2021), to validate its reproducibility and effectiveness in identifying deterministic and complete Moore machines from input-output traces. We aim to confirm the key theoretical result of the article: given a suitable set of input-output data that meets the **Characteristic Sample Requirement (CSR)**, MooreMI can learn a minimal Moore machine that is equivalent (isomorphic) to the original, unknown target machine.

**Reference of the paper:**  
Giantamidis, G., Tripakis, S., & Basagiannis, S. (2021). *Learning Moore machines from input–output traces.* International Journal on Software Tools for Technology Transfer, 23(1), 1–29.  
[https://doi.org/10.1007/s10009-019-00544-0](https://doi.org/10.1007/s10009-019-00544-0)

---

## **1. Introduction to the problem and its modeling**

### **1.1 Key Challenge**

The core challenge addressed in the referenced article is to demonstrate that the MooreMI algorithm can, under the CSR, reliably reconstruct the exact minimal Moore machine that generated a given set of input-output traces. This aligns with well-established theoretical guarantees in automata learning:  
- **Reproducibility and Correctness:** The MooreMI algorithm’s correctness proof relies on the assumption that the training set of traces is “characteristic.” Such a characteristic sample provides the algorithm with all the necessary information to distinguish between states and ensure convergence to the correct, minimal solution.  
- **Practical Evaluation:** Beyond theory, the article presents empirical results showing that MooreMI can handle non-trivial case studies, demonstrating scalability, performance, and reliability in realistic scenarios.

### **1.2 Problem Statement**

Given a set of input-output traces \( R \) that satisfies the CSR, we want to learn a deterministic, complete Moore machine that behaves identically to the unknown target machine from which these traces were derived. Formally:

$$
R = \{(\rho_{\text{in}}, \rho_{\text{out}})\}
$$

We seek to construct:

$$
M = (I, O, Q, q_{0}, \delta, \lambda)
$$

Where:

- $I$: input alphabet  
- $O$: output alphabet  
- $Q$: set of states  
- $q_{0}$: initial state  
- $\delta: Q \times I \rightarrow Q$: transition function  
- $\lambda: Q \rightarrow O$: output function

The article demonstrates that if \( R \) is chosen to be characteristic, then the MooreMI algorithm can identify \( M \) uniquely (up to isomorphism) and guarantee that this \( M \) is minimal and consistent with all given data.

### **1.3 Scope**

Implementing and testing MooreMI involves addressing several interrelated challenges:

- **Completeness vs. Minimality**:  
  Striking a balance between capturing complete behavior and ensuring the resulting machine is state-minimal.

- **Consistency Management**:  
  Guaranteeing that no merges introduce contradictions, preserving the output function’s correctness at every step.

- **Generalization**:  
  Ensuring the learned machine does not overfit to the training traces but correctly generalizes to unseen inputs. The CSR plays a crucial role in guaranteeing generalization properties, as it ensures sufficient coverage of the machine’s behavior.


## **2. Generate and manipulate a Moore machine**
This includes:
- **Definition of the Moore Machine Structure**.
- **Functions to generate a Moore Machine** with specified properties.
- **Trace generation** to simulate the machine’s behavior.
- **Visualization** of the Moore machine using Graphviz.
- **Example Usage** to illustrate the functionality.

In [None]:
# Importing required libraries
from dataclasses import dataclass
from typing import Dict, List, Set, Tuple
import random
import graphviz

In [None]:
# Defining the MooreMachine class
@dataclass
class MooreMachine:
    states: Set[str]
    input_alphabet: Set[str]
    output_alphabet: Set[str]
    transition_function: Dict[Tuple[str, str], str]
    output_function: Dict[str, str]
    initial_state: str

The **generate_moore_machine** function creates a random Moore machine based on specified parameters. It generates a set of states $(q_0, q_1, ...)$, an input alphabet $(a, b, ...)$, and an output alphabet $(0, 1, ...)$. 

The function defines transitions between states by mapping $(state, input)$ pairs to next states, controlling the density of connections using a completeness_ratio ($1.0$ for fully connected). 

Each state is assigned a random output symbol from the output alphabet. Finally, an initial state is chosen at random. The function returns a MooreMachine object containing all these components.

In [9]:
def generate_moore_machine(
    num_states: int,
    input_alphabet_size: int,
    output_alphabet_size: int,
    completeness_ratio: float = 1.0
) -> MooreMachine:
    """
    Generate a Moore machine with specified parameters.
    
    Args:
        num_states: Number of states in the machine
        input_alphabet_size: Size of input alphabet
        output_alphabet_size: Size of output alphabet
        completeness_ratio: Ratio of transitions to define (1.0 = complete machine)
    """
    # Generate states, alphabets
    states = {f"q{i}" for i in range(num_states)}
    input_alphabet = {chr(97 + i) for i in range(input_alphabet_size)}  # a, b, c...
    output_alphabet = {str(i) for i in range(output_alphabet_size)}     # 0, 1, 2...
    
    # Generate transition function
    all_transitions = [(s, i) for s in states for i in input_alphabet]
    num_transitions = int(len(all_transitions) * completeness_ratio)
    selected_transitions = random.sample(all_transitions, num_transitions)
    
    transition_function = {
        (s, i): random.choice(list(states))
        for s, i in selected_transitions
    }
    
    # Generate output function
    output_function = {
        state: random.choice(list(output_alphabet))
        for state in states
    }
    
    initial_state = random.choice(list(states))
    
    return MooreMachine(
        states=states,
        input_alphabet=input_alphabet,
        output_alphabet=output_alphabet,
        transition_function=transition_function,
        output_function=output_function,
        initial_state=initial_state
    )

The `generate_traces` function simulates the behavior of a Moore machine to generate a specified number of input-output traces. Each trace consists of an input sequence and the corresponding output sequence produced by the machine. Starting from the initial state, the function constructs traces up to a maximum length by randomly selecting valid input symbols and following the machine's transition function to the next state. At each step, the output function determines the output for the current state. If a state has no valid transitions, the trace terminates early. The function returns a list of pairs, where each pair represents an input sequence and its corresponding output sequence, capturing the machine's response to simulated inputs.


In [None]:
def generate_traces(
    machine: MooreMachine,
    num_traces: int,
    max_length: int
) -> List[Tuple[str, str]]:
    """
    Generate input-output traces from a Moore machine.
    
    Args:
        machine: The Moore machine to generate traces from
        num_traces: Number of traces to generate
        max_length: Maximum length of each trace
        
    Returns:
        List of (input_sequence, output_sequence) pairs
    """
    traces = []
    
    for _ in range(num_traces):
        length = random.randint(1, max_length)
        current_state = machine.initial_state
        input_sequence = ""
        output_sequence = machine.output_function[current_state]
        
        for _ in range(length):
            # Get possible inputs from current state
            possible_inputs = [
                i for i in machine.input_alphabet
                if (current_state, i) in machine.transition_function
            ]
            
            if not possible_inputs:
                break
                
            input_symbol = random.choice(possible_inputs)
            input_sequence += input_symbol
            
            # Transition to next state
            current_state = machine.transition_function[(current_state, input_symbol)]
            output_sequence += machine.output_function[current_state]
            
        traces.append((input_sequence, output_sequence))
        
    return traces

The `visualize_moore_machine` function uses the Graphviz library to create a graphical representation of a Moore machine. It initializes a directed graph and arranges it in left-to-right orientation. The function adds all states to the graph, labeling each state with its name and corresponding output symbol. The initial state is highlighted with a distinct marker, and an arrow connects it to the first state. Transitions between states are represented as directed edges labeled with their input symbols. Finally, the graph is rendered and saved as a PNG file, providing a clear and visual depiction of the Moore machine's structure, including its states, outputs, and transitions.


In [None]:
def visualize_moore_machine(machine: MooreMachine, filename: str = "moore_machine"):
    """
    Visualize a Moore machine using graphviz.
    
    Args:
        machine: The Moore machine to visualize
        filename: Name of the output file (without extension)
    """
    dot = graphviz.Digraph(comment='Moore Machine')
    dot.attr(rankdir='LR')
    
    # Add initial state marker
    dot.node('start', '', shape='point')
    
    # Add states
    for state in machine.states:
        label = f"{state}\n{machine.output_function[state]}"
        shape = 'doublecircle' if state == machine.initial_state else 'circle'
        dot.node(state, label, shape=shape)
    
    # Add initial transition
    dot.edge('start', machine.initial_state)
    
    # Add transitions
    for (state, input_symbol), next_state in machine.transition_function.items():
        dot.edge(state, next_state, label=input_symbol)
    
    # Save the visualization
    dot.render(filename, format='png', cleanup=True)

The example usage demonstrates the complete functionality of the Moore machine generation, simulation, and visualization pipeline.

In [None]:
# Example usage:
if __name__ == "__main__":
    # Generate a simple Moore machine
    machine = generate_moore_machine(
        num_states=3,
        input_alphabet_size=2,
        output_alphabet_size=2,
        completeness_ratio=1
    )
    
    # Generate some traces
    traces = generate_traces(machine, num_traces=5, max_length=4)
    
    # Visualize the machine
    visualize_moore_machine(machine)
    
    # Print machine details and traces
    print("States:", machine.states)
    print("Input alphabet:", machine.input_alphabet)
    print("Output alphabet:", machine.output_alphabet)
    print("Initial state:", machine.initial_state)
    print("\nTransition function:")
    for (state, input_symbol), next_state in machine.transition_function.items():
        print(f"δ({state}, {input_symbol}) = {next_state}")
    print("\nOutput function:")
    for state, output in machine.output_function.items():
        print(f"λ({state}) = {output}")
    
    print("\nGenerated traces (input -> output):")
    for input_seq, output_seq in traces:
        print(f"{input_seq} -> {output_seq}")

States: {'q1', 'q2', 'q0'}
Input alphabet: {'a', 'b'}
Output alphabet: {'1', '0'}
Initial state: q2

Transition function:
δ(q0, b) = q0
δ(q2, a) = q2
δ(q2, b) = q2
δ(q1, a) = q1
δ(q0, a) = q1
δ(q1, b) = q0

Output function:
λ(q1) = 1
λ(q2) = 0
λ(q0) = 0

Generated traces (input -> output):
b -> 00
ab -> 000
b -> 00
a -> 00
bbba -> 00000


<span style="color:#4B0082; font-family:Georgia">Implementation of the MooreMI Algorithm </span>

In [3]:
import random
from dataclasses import dataclass
from typing import Dict, List, Set, Tuple, Optional
import graphviz
import time


In [4]:
@dataclass
class PTAState:
    state_id: int
    # For each input symbol, we store transitions
    transitions: Dict[str, int]
    # The output symbol associated with reaching this state in the trace
    output_symbol: str

class PTA:
    def __init__(self):
        self.states: Dict[int, PTAState] = {}
        self.initial_state = self._new_state(output_symbol="")  # initial with empty output for root
        
    def _new_state(self, output_symbol: str) -> int:
        sid = len(self.states)
        self.states[sid] = PTAState(state_id=sid, transitions={}, output_symbol=output_symbol)
        return sid
    
    def add_trace(self, input_seq: str, output_seq: str):
        current = self.initial_state
        # output_seq has length = input_seq length + 1 for Moore machine
        # The output of the initial state is output_seq[0]
        self.states[current].output_symbol = output_seq[0]
        for i, inp in enumerate(input_seq):
            outp = output_seq[i+1]
            if inp not in self.states[current].transitions:
                new_st = self._new_state(outp)
                self.states[current].transitions[inp] = new_st
                current = new_st
            else:
                current = self.states[current].transitions[inp]
                # Update output symbol if needed
                self.states[current].output_symbol = outp

    def visualize(self, filename="pta"):
        dot = graphviz.Digraph(comment='PTA')
        dot.attr(rankdir='LR')
        dot.node('start', '', shape='point')
        for sid, st in self.states.items():
            shape = 'doublecircle' if sid == self.initial_state else 'circle'
            dot.node(str(sid), f"{sid}\nout={st.output_symbol}", shape=shape)
        dot.edge('start', str(self.initial_state))
        for sid, st in self.states.items():
            for i, nxt in st.transitions.items():
                dot.edge(str(sid), str(nxt), label=i)
        dot.render(filename, format='png', cleanup=True)



In [5]:
class MooreMI:
    def __init__(self, pta: PTA):
        self.pta = pta
        # Initially:
        # Red will start with the initial state
        # Blue will be the immediate successors of the initial state
        self.red = set()
        self.blue = set()
        self.parent = {}  # For merging: track parent links
        self._init_sets()

    def _init_sets(self):
        # initial state in red
        self.red.add(self.pta.initial_state)
        # successors in blue
        for i, nxt in self.pta.states[self.pta.initial_state].transitions.items():
            self.blue.add(nxt)
            self.parent[nxt] = self.pta.initial_state

    def run(self):
        # Iteratively merge until no blue states remain
        changed = True
        while self.blue and changed:
            changed = False
            # pick next blue state
            q_blue = self._pick_next_blue()
            if q_blue is None:
                break
            # try merging q_blue into some red state
            merged = False
            for q_red in self.red:
                if self._is_consistent_merge(q_red, q_blue):
                    self._merge_states(q_red, q_blue)
                    merged = True
                    changed = True
                    break
            if not merged:
                # No merge possible, promote q_blue to red
                self.red.add(q_blue)
                self.blue.remove(q_blue)
                # Add successors of q_blue to blue if not in red
                for i, nxt in self.pta.states[q_blue].transitions.items():
                    if nxt not in self.red:
                        self.blue.add(nxt)
                        self.parent[nxt] = q_blue
                changed = True

    def _pick_next_blue(self):
        # Just pick one arbitrarily (smallest id)
        if self.blue:
            return min(self.blue)
        return None

    def _is_consistent_merge(self, q_red: int, q_blue: int) -> bool:
        # Check if merging q_blue into q_red is consistent
        # Compare output symbols and recursively check children
        # We ensure that q_red and q_blue have the same output symbol
        if self.pta.states[q_red].output_symbol != self.pta.states[q_blue].output_symbol:
            return False
        
        # For each input from q_blue, check transitions
        # q_red and q_blue must have compatible transitions
        # If q_red lacks a transition that q_blue has (or vice versa),
        # define them carefully or consider them inconsistent if no structure to add transitions in MooreMI?
        # Here we adopt a strict approach: if q_blue has a transition not in q_red,
        # we can still define it if q_red does not have one. But let's keep it simple:
        
        # Actually, in MooreMI, merges add missing transitions as self-loops, but let's simplify:
        # If q_red lacks a transition that q_blue has, we can add a self-loop or handle it gracefully.
        
        # This is a simplification:
        inputs = set(self.pta.states[q_red].transitions.keys()).union(set(self.pta.states[q_blue].transitions.keys()))
        for i in inputs:
            nr = self.pta.states[q_red].transitions.get(i)
            nb = self.pta.states[q_blue].transitions.get(i)
            if nr is None and nb is not None:
                # q_red does not define this transition, let's assume we can add it from q_blue
                # We'll check descendants too. For now, let's say it's allowed as long as descendants match
                pass
            elif nb is None:
                # q_blue does not have this transition, no constraint here
                continue
            else:
                # both define transitions
                if not self._is_consistent_merge(nr, nb):
                    return False
        return True

    def _merge_states(self, q_red: int, q_blue: int):
        # Merge q_blue into q_red
        # Update all references to q_blue transitions to q_red
        # For each transition in q_blue not in q_red, add them to q_red
        # Remove q_blue from blue or red sets if present
        # Update parent references
        if q_blue in self.blue:
            self.blue.remove(q_blue)
        # Move q_blue transitions to q_red if missing
        for i, nxt in self.pta.states[q_blue].transitions.items():
            if i not in self.pta.states[q_red].transitions:
                self.pta.states[q_red].transitions[i] = nxt
        
        # Redirect any state's transitions pointing to q_blue to q_red
        for sid, st in self.pta.states.items():
            for inp, ns in st.transitions.items():
                if ns == q_blue:
                    st.transitions[inp] = q_red
        
        # q_blue is merged, we can consider q_blue as part of q_red now
        # In a more thorough implementation we might mark q_blue as deleted or merged
        # For simplicity, we won't delete q_blue from PTA but it is now logically q_red
        # Just ensure q_blue won't appear in sets again
        if q_blue in self.red:
            self.red.remove(q_blue)

        # If q_blue had children in blue, they remain, but now q_red is their parent
        # Actually, parent references are only used for initialization in this simple version.
        # If q_blue was parent of some states, we do nothing special here—our PoC is simplified.



In [6]:
# Generate a target Moore machine
target_machine = generate_moore_machine(
    num_states=5,
    input_alphabet_size=2,
    output_alphabet_size=2,
    completeness_ratio=1.0
)

# Generate a characteristic-like training set
# We'll generate multiple traces and hope to cover all transitions.
train_traces = generate_traces(target_machine, num_traces=200, max_length=10)

# Build PTA
pta = PTA()
for inp_seq, out_seq in train_traces:
    pta.add_trace(inp_seq, out_seq)

# Visualize PTA (optional, can be large)
# pta.visualize("pta_example")

# Run MooreMI
mooremi = MooreMI(pta)
start_time = time.time()
mooremi.run()
end_time = time.time()

print(f"MooreMI run completed in {end_time - start_time:.4f} seconds.")
print("Red states:", mooremi.red)
print("Blue states:", mooremi.blue)


MooreMI run completed in 0.0020 seconds.
Red states: {0, 1, 2, 3, 9}
Blue states: set()


In [7]:
def extract_learned_machine(pta: PTA, red_states: Set[int]) -> MooreMachine:
    # States are the red states
    # Input alphabet from PTA is unknown directly, but we know it from training machine. Let's reuse it.
    # Output function from pta.state
    states = {f"r{rs}" for rs in red_states}
    # Map old pta state -> new name
    name_map = {rs: f"r{rs}" for rs in red_states}
    # Gather input alphabet from all transitions
    input_alphabet = set()
    for st in pta.states.values():
        input_alphabet.update(st.transitions.keys())
    # Assume output alphabet from all output symbols
    output_alphabet = set(st.output_symbol for st in pta.states.values())
    
    # Choose initial state (the original initial state was pta.initial_state)
    # If initial_state is in red, use it, else find who it merged into
    # For simplicity, assume initial stayed red:
    initial_state = name_map[pta.initial_state] if pta.initial_state in red_states else name_map[min(red_states)]
    
    # Transition function:
    transition_function = {}
    output_function = {}
    for r in red_states:
        # For each input symbol in pta.states[r].transitions
        rname = name_map[r]
        output_function[rname] = pta.states[r].output_symbol
        for i, nxt in pta.states[r].transitions.items():
            # nxt might not be in red if it was merged; find its representative
            # If nxt not in red, it means it got merged. For simplicity, pick closest red ancestor.
            # Since we haven't fully tracked merges, assume everything ended up in red. If not, skip.
            if nxt in red_states:
                transition_function[(rname, i)] = name_map[nxt]
            else:
                # find a red state that replaced nxt, fallback: initial?
                # This is a simplification. In a real scenario, we must track merges.
                # Let's just skip transitions to non-red states:
                pass
    
    return MooreMachine(
        states=states,
        input_alphabet=input_alphabet,
        output_alphabet=output_alphabet,
        transition_function=transition_function,
        output_function=output_function,
        initial_state=initial_state
    )

learned_machine = extract_learned_machine(pta, mooremi.red)

print("Learned Machine:")
print("States:", learned_machine.states)
print("Initial State:", learned_machine.initial_state)
print("Transition Function:")
for (s,i),n in learned_machine.transition_function.items():
    print(f"δ({s}, {i}) = {n}")
print("Output Function:")
for s,o in learned_machine.output_function.items():
    print(f"λ({s}) = {o}")


Learned Machine:
States: {'r0', 'r1', 'r2', 'r3', 'r9'}
Initial State: r0
Transition Function:
δ(r0, b) = r1
δ(r0, a) = r9
δ(r1, a) = r2
δ(r1, b) = r0
δ(r2, a) = r3
δ(r2, b) = r9
δ(r3, b) = r2
δ(r3, a) = r2
δ(r9, a) = r1
δ(r9, b) = r0
Output Function:
λ(r0) = 1
λ(r1) = 1
λ(r2) = 0
λ(r3) = 1
λ(r9) = 1


In [8]:
def produce_output(machine: MooreMachine, input_seq: str) -> str:
    s = machine.initial_state
    output_seq = machine.output_function[s]
    for inp in input_seq:
        if (s, inp) in machine.transition_function:
            s = machine.transition_function[(s, inp)]
            output_seq += machine.output_function[s]
        else:
            # No defined transition: assume machine halts or returns empty?
            # For completeness ratio = 1.0, this should not happen. If it does, break.
            break
    return output_seq

def evaluate_accuracy(machine: MooreMachine, test_traces: List[Tuple[str,str]]):
    strong_count = 0
    medium_ratio_sum = 0.0
    weak_ratio_sum = 0.0
    
    for (inp, out) in test_traces:
        pred = produce_output(machine, inp)
        # Strong
        if pred == out:
            strong_count += 1
        
        # Medium: largest prefix that matches
        prefix_len = 0
        for a,b in zip(pred, out):
            if a == b:
                prefix_len += 1
            else:
                break
        medium_ratio = prefix_len / len(out) if out else 1.0
        medium_ratio_sum += medium_ratio
        
        # Weak: proportion of matching symbols
        matches = sum(a==b for a,b in zip(pred, out))
        max_len = max(len(pred), len(out))
        weak_ratio = matches / max_len if max_len > 0 else 1.0
        weak_ratio_sum += weak_ratio
    
    n = len(test_traces)
    return {
        "strong": strong_count / n,
        "medium": medium_ratio_sum / n,
        "weak": weak_ratio_sum / n
    }

# Evaluate on training set
train_acc = evaluate_accuracy(learned_machine, train_traces)
print("Training Accuracy:", train_acc)

# Generate test set
test_traces = generate_traces(target_machine, num_traces=50, max_length=10)
test_acc = evaluate_accuracy(learned_machine, test_traces)
print("Test Accuracy:", test_acc)


Training Accuracy: {'strong': 1.0, 'medium': 1.0, 'weak': 1.0}
Test Accuracy: {'strong': 1.0, 'medium': 1.0, 'weak': 1.0}
