In [2]:
# Planning prototype: track meet heat scheduling with STRIPS-style actions and A* search
# - States: sets of positive fluents
# - Actions: preconditions (pos, neg), effects (add, del)
# - Goal: all heats scheduled and run with resource and athlete constraints

from dataclasses import dataclass
from typing import List, Optional, Dict
import heapq

# ---------- Fluent helper ----------
def F(pred: str, *args: str) -> str:
    return f"{pred}({', '.join(args)})"

@dataclass(frozen=True)
class Action:
    name: str
    pre_pos: frozenset
    pre_neg: frozenset
    add: frozenset
    delete: frozenset

def applicable(state: frozenset, a: Action) -> bool:
    return a.pre_pos.issubset(state) and state.isdisjoint(a.pre_neg)

def apply(state: frozenset, a: Action) -> frozenset:
    return frozenset((state - a.delete) | a.add)

# ---------- Meet setup ----------
athletes = ["A1", "A2", "A3", "A4"]
heats = ["H1", "H2"]
heat_roster = {
    "H1": ["A1", "A2"],
    "H2": ["A3", "A4"],
}
slots = ["S1", "S2", "S3"]
track = "T1"
officials = "O1"

# Initial state fluents
init = set()
init.add(F("Track", track))
init.add(F("Officials", officials))
for sl in slots:
    init.add(F("Slot", sl))
    init.add(F("FreeSlot", sl))
    init.add(F("OfficialsFree", officials, sl))
    init.add(F("TrackFree", track, sl))
for h in heats:
    init.add(F("Heat", h))
    init.add(F("HeatUnscheduled", h))
for a in athletes:
    init.add(F("Athlete", a))
    init.add(F("NotCheckedIn", a))

init_state = frozenset(init)

# ---------- Action schemas ----------
actions: List[Action] = []

# CheckIn
for a in athletes:
    actions.append(Action(
        name=f"CheckIn({a})",
        pre_pos=frozenset({F("Athlete", a), F("NotCheckedIn", a)}),
        pre_neg=frozenset(),
        add=frozenset({F("CheckedIn", a)}),
        delete=frozenset({F("NotCheckedIn", a)}),
    ))

# AssignHeatToSlot
for h in heats:
    for sl in slots:
        actions.append(Action(
            name=f"AssignHeatToSlot({h}, {sl})",
            pre_pos=frozenset({
                F("Heat", h),
                F("HeatUnscheduled", h),
                F("Slot", sl),
                F("FreeSlot", sl),
                F("TrackFree", track, sl),
                F("OfficialsFree", officials, sl),
            }),
            pre_neg=frozenset(),
            add=frozenset({
                F("HeatAssigned", h, sl),
                F("SlotReserved", sl),
                F("TrackReserved", track, sl),
                F("OfficialsReserved", officials, sl),
            }),
            delete=frozenset({
                F("HeatUnscheduled", h),
                F("FreeSlot", sl),
                F("TrackFree", track, sl),
                F("OfficialsFree", officials, sl),
            }),
        ))

# RunHeat
for h in heats:
    for sl in slots:
        pre_pos = {
            F("HeatAssigned", h, sl),
            F("SlotReserved", sl),
            F("TrackReserved", track, sl),
            F("OfficialsReserved", officials, sl),
        }
        for a in heat_roster[h]:
            pre_pos.add(F("CheckedIn", a))
        actions.append(Action(
            name=f"RunHeat({h}, {sl})",
            pre_pos=frozenset(pre_pos),
            pre_neg=frozenset(),
            add=frozenset({
                F("HeatRun", h, sl),
                F("TrackFree", track, sl),
                F("OfficialsFree", officials, sl),
                F("SlotConsumed", sl),
            }),
            delete=frozenset({
                F("SlotReserved", sl),
                F("TrackReserved", track, sl),
                F("OfficialsReserved", officials, sl),
            }),
        ))

# ---------- Goal ----------
def goal_test(state: frozenset) -> bool:
    for h in heats:
        if not any(F("HeatRun", h, sl) in state for sl in slots):
            return False
    return True

# ---------- Heuristic ----------
def heuristic(state: frozenset) -> int:
    not_run = sum(1 for h in heats if not any(F("HeatRun", h, sl) in state for sl in slots))
    not_checked = sum(1 for a in athletes if F("NotCheckedIn", a) in state)
    not_assigned = sum(1 for h in heats if not any(F("HeatAssigned", h, sl) in state for sl in slots))
    return not_run + not_checked + not_assigned

# ---------- A* search ----------
def astar(initial: frozenset, actions: List[Action]) -> Optional[List[Action]]:
    frontier = []
    g0 = 0
    h0 = heuristic(initial)
    heapq.heappush(frontier, (g0 + h0, g0, initial, []))
    best_g: Dict[frozenset, int] = {initial: 0}

    while frontier:
        f, g, state, plan = heapq.heappop(frontier)
        if goal_test(state):
            return plan
        for a in actions:
            if applicable(state, a):
                ns = apply(state, a)
                g2 = g + 1
                if ns not in best_g or g2 < best_g[ns]:
                    best_g[ns] = g2
                    h2 = heuristic(ns)
                    heapq.heappush(frontier, (g2 + h2, g2, ns, plan + [a]))
    return None

# ---------- Solve ----------
plan = astar(init_state, actions)
if not plan:
    print("No schedule found.")
else:
    print(f"Plan found with {len(plan)} steps:")
    for i, a in enumerate(plan, 1):
        print(f"{i}. {a.name}")

    # Validate final state
    final_state = init_state
    for a in plan:
        final_state = apply(final_state, a)

    print("\nGoal satisfied?", goal_test(final_state))

    # Pretty-print schedule (fixed variable naming)
    assignments = []
    runs = []
    for h in heats:
        for sl in slots:
            if F("HeatAssigned", h, sl) in final_state:
                assignments.append((h, sl))
            if F("HeatRun", h, sl) in final_state:
                runs.append((h, sl))

    print("\nAssignments:")
    for h, sl in sorted(assignments):
        print(f"  {h} -> {sl}")

    print("\nExecuted heats:")
    for h, sl in sorted(runs):
        print(f"  {h} at {sl}")

    # Constraint checks
    ok = True
    for sl in slots:
        run_in_slot = [h for (h, ss) in runs if ss == sl]
        if len(run_in_slot) > 1:
            print(f"Constraint violation: multiple heats run in slot {sl}: {run_in_slot}")
            ok = False
    for h, sl in runs:
        for a in heat_roster[h]:
            if F("CheckedIn", a) not in final_state:
                print(f"Constraint violation: Athlete {a} ran without check-in.")
                ok = False
    print("\nConstraint checks passed?" if ok else "\nConstraint checks failed.")



Plan found with 8 steps:
1. CheckIn(A1)
2. AssignHeatToSlot(H1, S3)
3. AssignHeatToSlot(H2, S2)
4. CheckIn(A4)
5. CheckIn(A3)
6. RunHeat(H2, S2)
7. CheckIn(A2)
8. RunHeat(H1, S3)

Goal satisfied? True

Assignments:
  H1 -> S3
  H2 -> S2

Executed heats:
  H1 at S3
  H2 at S2

Constraint checks passed?
