In [49]:
import pitct
import os

# --- Phase 0: Project Setup ---
# Create a working directory to store files generated by pitct
if not os.path.exists('llm_controller_project'):
    os.makedirs('llm_controller_project')
pitct.init('llm_controller_project', overwrite=True)

print("--- Project: Formal Control of LLM Interaction Behavior Based on DES ---")
print("--- Scenario: Technical Support Bot ---")

# --- Phase 1: Scenario Definition and Plant Modeling (G) ---

# State definitions (States)
# 0: S0_Idle (Idle)
# 1: S1_ProblemReceived (Problem received)
# 2: S2_InfoGathering (Gathering information)
# 3: S3_SolutionProposed (Solution proposed)
# 4: S4_Escalated (Escalated)
# 5: S5_Resolved (Resolved)
states_g = {
    0: "S0_Idle", 1: "S1_ProblemReceived", 2: "S2_InfoGathering",
    3: "S3_SolutionProposed", 4: "S4_Escalated", 5: "S5_Resolved"
}
statenum_g = len(states_g)

# Event definitions (Events)
# Controllable events (Controllable, 'c'): Prompts
# Uncontrollable events (Uncontrollable, 'u'): LLM/User Responses
# Format: (source_state, event_name, dest_state, controllability_flag)
trans_g = [
    # Process start
    (0, 'p_ask_problem', 1, 'c'),
    # From problem received to information gathering
    (1, 'p_ask_details', 2, 'c'),
    # In problem reception phase, user/LLM may also be confused
    (1, 'r_is_confused', 1, 'u'),   # User/LLM is confused, stays in current phase
    # In information gathering phase, LLM/user possible responses
    (2, 'r_provides_info', 3, 'u'), # User provides information, enters solution proposal phase
    (2, 'r_is_confused', 2, 'u'),   # User/LLM is confused, stays in current phase
    # After proposing solution
    (3, 'r_confirms_solved', 5, 'u'), # User confirms resolution
    (3, 'r_requests_escalation', 4, 'u'), # User requests escalation
    (3, 'r_is_confused', 3, 'u'),   # User may also be confused in solution proposal phase
    # Allow direct escalation after problem reception (this is the behavior we will prohibit later)
    (1, 'p_escalate', 4, 'c'),
    # Allow bot to actively escalate after proposing solution
    (3, 'p_escalate', 4, 'c'),
    # After escalation, user may also be confused
    (4, 'r_is_confused', 4, 'u'),   # User may also be confused in escalation phase
    # After resolution or escalation, can return to idle state
    (4, 'p_end_session', 0, 'c'),
    (5, 'r_is_confused', 5, 'u'),   # User may also be confused in resolution phase
    (5, 'p_end_session', 0, 'c'),
]

# Marker states (Marker States): Represent task success or stable end states
marker_g = [0, 5] 

# Use pitct to create Plant automaton G
pitct.create('G', statenum_g, trans_g, marker_g)
print("\n[1] Plant 'G' created successfully. Represents all possible interaction flows.")
# Visualize Plant
model_G = pitct.display_automaton('G', color=True)
model_G.save('G_plant.png')
print("    -> Plant model diagram saved as 'G_plant.png'")



--- Project: Formal Control of LLM Interaction Behavior Based on DES ---
--- Scenario: Technical Support Bot ---

[1] Plant 'G' created successfully. Represents all possible interaction flows.
    -> Plant model diagram saved as 'G_plant.png'


In [50]:

# --- Phase 2: Specification Design and Controller Synthesis (E & C) ---

# Specification: "Event p_escalate must never occur before event r_provides_info"

# States:
# 0: E0_BeforeSolution (Before solution proposed)
# 1: E1_AfterSolution (After solution proposed)
# 2: E2_Trap (Violation trap)
states_e = {0: "E0_BeforeSolution", 1: "E1_AfterSolution", 2: "E2_Trap"}
statenum_e = len(states_e)

event_controllability = {name: flag for _, name, _, flag in trans_g}
all_events = event_controllability.keys()

trans_e = []
for event in all_events:
    flag = event_controllability[event]
    
    # --- Logic for state E0 (Before solution proposed) ---
    if event == 'p_escalate':
        # Violating behavior: escalating before solution proposed -> enter trap
        trans_e.append((0, event, 2, flag))
    elif event == 'r_provides_info':
        # Key progress: solution proposed -> enter E1
        trans_e.append((0, event, 1, flag))
    else:
        # Other events: stay in E0
        trans_e.append((0, event, 0, flag))
        
    # --- Logic for state E1 (After solution proposed) ---
    # After this, all behaviors are allowed, state remains unchanged
    trans_e.append((1, event, 1, flag))
    
    # --- Logic for state E2 (Trap) ---
    # Once in trap, stay in trap forever
    trans_e.append((2, event, 2, flag))

# Only non-trap states are marker states
marker_e = [0, 1]


# Create Specification automaton E
pitct.create('E', statenum_e, trans_e, marker_e)
print("\n[2] Specification 'E' created successfully. Defines desired interaction rules.")
model_E = pitct.display_automaton('E', color=True)
model_E.save('E_spec.png')
print("    -> Specification model diagram saved as 'E_spec.png'")

# Synthesize Supervisor
print("\n[3] Synthesizing Supervisor 'C'...")
# First compute synchronous product S = G || E
pitct.sync('S', 'G', 'E')
pitct.display_automaton('S', color=True).save('S_sync.png')
# Then compute optimal controller C = supcon(G, S)
pitct.supcon('C', 'G', 'S')
print("    -> Supervisor 'C' synthesis completed!")
model_C = pitct.display_automaton('C', color=True)
model_C.save('C_supervisor.png')
print("    -> Supervisor model diagram saved as 'C_supervisor.png'")




[2] Specification 'E' created successfully. Defines desired interaction rules.
    -> Specification model diagram saved as 'E_spec.png'

[3] Synthesizing Supervisor 'C'...
    -> Supervisor 'C' synthesis completed!
    -> Supervisor model diagram saved as 'C_supervisor.png'


In [51]:
# --- Phase 3: Prototype Implementation and Closed-loop Testing (Fully Corrected Version) ---

print("\n" + "="*50)
print("--- Phase 3: Startup and Closed-loop Controlled Interaction with Mock LLM ---")
print("="*50)

# Get all transitions of controller C for repeated use in the loop
all_c_transitions = pitct.trans('C')

# Build mapping from C states to G states
# Strategy: Infer corresponding G states by tracking transition events in C
# Since C is a subset of the synchronous product of G and E, we need to map through event sequences
c_state_to_g_state = {}

# Use BFS starting from initial state to track corresponding G state for each C state
from collections import deque
queue = deque([(0, 0)])  # (c_state, g_state)
visited = {0}
c_state_to_g_state[0] = 0

while queue:
    c_state, g_state = queue.popleft()
    
    # Get all transitions from current C state
    for c_trans in all_c_transitions:
        if c_trans[0] == c_state:
            event = c_trans[1]
            next_c_state = c_trans[2]
            
            # Find transition with same event in G
            next_g_state = None
            for g_trans in trans_g:
                if g_trans[0] == g_state and g_trans[1] == event:
                    next_g_state = g_trans[2]
                    break
            
            if next_g_state is not None and next_c_state not in visited:
                c_state_to_g_state[next_c_state] = next_g_state
                visited.add(next_c_state)
                queue.append((next_c_state, next_g_state))

print("[DEBUG] State mapping from controller C to Plant G:")
print(c_state_to_g_state)

prompt_texts = {
    'p_ask_problem': "Hello! I'm a support bot. Please describe your problem.",
    'p_ask_details': "Thank you. Can you provide more details?",
    'p_escalate': "I will escalate this issue to a human agent.",
    'p_end_session': "Session is ending. Goodbye."
}

def mock_llm(prompt, current_state_name):
    print(f"[LLM Input] Prompt: '{prompt}' (from state: {current_state_name})")
    if "describe your problem" in prompt: return "My computer won't turn on.", "r_is_confused"
    if "more details" in prompt: return "I've already checked the power cable.", "r_provides_info"
    if "ending" in prompt or "Goodbye" in prompt: return "Thank you!", "r_confirms_solved"
    return "I am confused.", "r_is_confused"

# ============================ Core Corrected Logic Starts ============================

# 1. Initial state is always 0
current_state_c = 0

while True:
    # Map current C state back to G state for display to user
    g_state_id = c_state_to_g_state.get(current_state_c)
    if g_state_id is None:
        print(f"Error: Cannot map controller state {current_state_c} to G state. Session terminated.")
        break
    current_state_name = states_g[g_state_id]
    
    print(f"\n==> Current state: {current_state_name} (G:{g_state_id}, C:{current_state_c}) <==")

    # 2. Filter outgoing transitions from current state
    trans_from_current = [t for t in all_c_transitions if t[0] == current_state_c]

    if not trans_from_current:
        print("Session ended (reached terminal state).")
        break

    allowed_prompts = [t[1] for t in trans_from_current if t[1] in prompt_texts]
    
    if not allowed_prompts:
        print("No executable actions in current state, waiting for external events...")
        uncontrollable_trans = [t for t in trans_from_current if t[1] not in prompt_texts]
        if not uncontrollable_trans:
            print("Session ended (no available transitions).")
            break
        event_to_fire = uncontrollable_trans[0][1]
        next_state_c = uncontrollable_trans[0][2]
        print(f"Auto-triggering uncontrollable event: {event_to_fire}")
        current_state_c = next_state_c
        continue

    print("--- Please select an action (Prompt) ---")
    for i, p_label in enumerate(allowed_prompts):
        print(f"  {i+1}: {prompt_texts[p_label]} ({p_label})")

    try:
        choice = int(input("Enter option number: ")) - 1
        chosen_prompt_label = allowed_prompts[choice]
    except (ValueError, IndexError):
        print("Invalid input, please try again.")
        continue

    # 3. Find and execute transition
    next_state_c = -1
    for t in trans_from_current:
        if t[1] == chosen_prompt_label:
            next_state_c = t[2]
            break
    
    print(f"--- [Action] Executing '{chosen_prompt_label}'.")
    current_state_c = next_state_c
    
    # If executing p_end_session, end session directly without calling LLM
    if chosen_prompt_label == 'p_end_session':
        print("--- [Session] Session ended normally.")
        break
    
    # Call LLM
    g_state_id_for_llm = c_state_to_g_state.get(current_state_c, -1)
    llm_response_text, llm_response_event = mock_llm(
        prompt_texts[chosen_prompt_label], states_g.get(g_state_id_for_llm, "Unknown")
    )
    print(f"    [LLM Output] Response: '{llm_response_text}' (classified as: '{llm_response_event}')")

    # Update state based on LLM response
    next_state_after_llm = -1
    # Need to re-filter transitions from new state
    trans_from_new_state = [t for t in all_c_transitions if t[0] == current_state_c]
    for t in trans_from_new_state:
        if t[1] == llm_response_event:
            next_state_after_llm = t[2]
            break

    if next_state_after_llm != -1:
        print(f"--- [Event] Event '{llm_response_event}' occurred.")
        current_state_c = next_state_after_llm
    else:
        print(f"Warning: Unexpected event '{llm_response_event}' occurred in controller state {current_state_c}! Session terminated.")
        break

# ============================ Core Corrected Logic Ends ============================


--- Phase 3: Startup and Closed-loop Controlled Interaction with Mock LLM ---
[DEBUG] State mapping from controller C to Plant G:
{0: 0, 1: 1, 2: 2, 3: 3, 4: 5, 5: 4, 6: 0, 7: 1, 8: 2}

==> Current state: S0_Idle (G:0, C:0) <==
--- Please select an action (Prompt) ---
  1: Hello! I'm a support bot. Please describe your problem. (p_ask_problem)
--- [Action] Executing 'p_ask_problem'.
[LLM Input] Prompt: 'Hello! I'm a support bot. Please describe your problem.' (from state: S1_ProblemReceived)
    [LLM Output] Response: 'My computer won't turn on.' (classified as: 'r_is_confused')
--- [Event] Event 'r_is_confused' occurred.

==> Current state: S1_ProblemReceived (G:1, C:1) <==
--- Please select an action (Prompt) ---
  1: Thank you. Can you provide more details? (p_ask_details)
--- [Action] Executing 'p_ask_details'.
[LLM Input] Prompt: 'Thank you. Can you provide more details?' (from state: S2_InfoGathering)
    [LLM Output] Response: 'I've already checked the power cable.' (classified