In [1]:
from collections import defaultdict
from itertools import combinations
import os, shutil


In [2]:
class Action:
    def __init__(self, name, pre, add, delete=None):
        self.name = name
        self.pre = set(pre)
        self.add = set(add)
        self.delete = set(delete or [])
    def __repr__(self): 
        return f"Action({self.name})"

def make_persistence_actions(literals):
    return [Action(f"Persist({p})", [p], [p], []) for p in literals]

class Level:
    def __init__(self, items=None):
        self.items = set(items or [])
        self.mutex = set()
    def add_mutex(self, a, b):
        if a != b:
            self.mutex.add(frozenset([a, b]))
    def is_mutex(self, a, b):
        return a != b and frozenset([a, b]) in self.mutex


In [3]:
class PlanningGraph:
    def __init__(self, actions, init, goals, max_levels=10):
        self.base_actions = actions
        self.init = set(init)
        self.goals = set(goals)
        self.max_levels = max_levels
        self.S = []
        self.A = []

        self.pre_edges = defaultdict(list)
        self.eff_edges = defaultdict(list)
        self.A_mutex  = defaultdict(list)

    def _inconsistent_effects(self, a1, a2):
        return any(x in a2.delete for x in a1.add) or any(x in a1.delete for x in a2.add)

    def expand_until_goals(self):
        self.S.append(Level(self.init))
        for k in range(self.max_levels):
            Sk = self.S[k]
            applicable = [a for a in self.base_actions if a.pre.issubset(Sk.items)]
            actions = applicable + make_persistence_actions(Sk.items)

            for a in actions:
                for p in a.pre:
                    self.pre_edges[k].append((p, a))

            Ak = Level(actions)
            for a1, a2 in combinations(actions, 2):
                if self._inconsistent_effects(a1, a2):
                    Ak.add_mutex(a1, a2)
                    self.A_mutex[k].append((a1, a2))
            self.A.append(Ak)

            next_literals = set(Sk.items)
            for a in actions:
                for p in a.add:
                    next_literals.add(p)
                    self.eff_edges[k].append((a, p))

            Sk1 = Level(next_literals)
            self.S.append(Sk1)

            if self.goals.issubset(Sk1.items):
                return k + 1
            if Sk1.items == Sk.items:
                return None
        return None

    def extract_plan(self, level_idx):
        memo = {}
        def backtrack(goals_at_level, level):
            key = (tuple(sorted(goals_at_level)), level)
            if key in memo: 
                return None
            if level == 0:
                return [] if set(goals_at_level).issubset(self.S[0].items) else None

            Ak = self.A[level-1]
            achievers = {g: [a for a in Ak.items if g in a.add] for g in goals_at_level}
            ordered = sorted(goals_at_level, key=lambda g: len(achievers[g]))

            def select(i, chosen):
                if i == len(ordered):
                    for a1, a2 in combinations(chosen, 2):
                        if Ak.is_mutex(a1, a2):
                            return None
                    new_goals = set()
                    for a in chosen:
                        new_goals |= a.pre
                    prefix = backtrack(tuple(sorted(new_goals)), level-1)
                    return None if prefix is None else prefix + [list(chosen)]

                g = ordered[i]
                for a in achievers[g]:
                    if any(Ak.is_mutex(a, c) for c in chosen):
                        continue
                    res = select(i+1, chosen + [a])
                    if res is not None:
                        return res
                return None

            plan = select(0, [])
            if plan is None:
                memo[key] = True
            return plan

        return backtrack(tuple(sorted(self.goals)), level_idx)

    def to_dot(self, upto_level, filename, plan):
        plan_actions_by_level = {t: set(acts) for t, acts in enumerate(plan)}

        def nid_s(s,p): 
            return f"S{s}_{sanitize(p)}"
        def nid_a(k,a): 
            return f"A{k}_{sanitize(a.name)}"

        lines = []
        lines.append('digraph FloodPlan {')
        lines.append('  rankdir=LR; splines=true; bgcolor="white";')
        lines.append('  node [fontsize=10]; edge [fontsize=9];')

        for s in range(upto_level+1):
            lines.append(f'  subgraph cluster_S{s} {{ label="S{s}"; style=dashed; color=gray;')
            for p in sorted(self.S[s].items):
                lines.append(f'    "{nid_s(s,p)}" [label="{p}", shape=ellipse];')
            lines.append('  }')

        for k in range(upto_level):
            lines.append(f'  subgraph cluster_A{k} {{ label="A{k}"; style=dashed; color=lightgray;')
            for a in self.A[k].items:
                if a.name.startswith("Persist("):
                    fill = "#fff8b0"
                elif a in plan_actions_by_level.get(k, set()):
                    fill = "#cfe3ff"
                else:
                    fill = "white"
                lines.append(
                    f'    "{nid_a(k,a)}" [label="{a.name}", shape=box, '
                    f'style="rounded,filled", fillcolor="{fill}"];'
                )
            lines.append('  }')

        for k in range(upto_level):
            for (p, a) in self.pre_edges[k]:
                color = "#0044cc" if a in plan_actions_by_level.get(k,set()) else "black"
                penw  = 3 if a in plan_actions_by_level.get(k,set()) else 1.4
                lines.append(f'"{nid_s(k,p)}" -> "{nid_a(k,a)}" [color="{color}", penwidth={penw}];')

            for (a, p) in self.eff_edges[k]:
                color = "#0044cc" if a in plan_actions_by_level.get(k,set()) else "#888888"
                penw  = 3 if a in plan_actions_by_level.get(k,set()) else 1.6
                lines.append(f'"{nid_a(k,a)}" -> "{nid_s(k+1,p)}" [color="{color}", penwidth={penw}];')

        for k in range(upto_level):
            for (a1, a2) in self.A_mutex[k]:
                lines.append(
                    f'"{nid_a(k,a1)}" -> "{nid_a(k,a2)}" '
                    '[dir=none, color="#d00000", style="dashed", penwidth=3, label="mutex", fontcolor="#a00000", constraint=false];'
                )

        lines.append('}')
        with open(filename, "w", encoding="utf-8") as f:
            f.write("\n".join(lines))


In [4]:
def sanitize(s):
    return (s.replace('"','').replace(' ','_')
             .replace('(','[').replace(')',']').replace(',',';'))

def domain_actions():
    A = []
    A.append(Action("Pre-DeployRescueTeam(Z1)",
                    ["ForecastFlood(Z1)"],
                    ["RescueTeamPredeployed(Z1)"]))

    A.append(Action("PrepareShelterAndBoats(Z1,S2)",
                    ["ForecastFlood(Z1)"],
                    ["ShelterOpen(S2)", "BoatsStaged(Z1)"]))

    A.append(Action("StockMedicalKitsInAdvance(S2,D1)",
                    ["KitsAtDepot(D1)", "ShelterOpen(S2)"],
                    ["ShelterStocked(S2)"]))

    A.append(Action("ShiftRouteDueToBridgeRisk(R1,R2,B3)",
                    ["BridgeAtRisk(B3)", "AltRouteClear(R2)"],
                    ["SafeRouteSelected(R2)"]))

    A.append(Action("EvacuateBeforeFlood(Z1,S2)",
                    ["RescueTeamPredeployed(Z1)", "BoatsStaged(Z1)",
                     "SafeRouteSelected(R2)", "ShelterStocked(S2)",
                     "WeatherWindowOpen"],
                    ["EvacuatedTo(Z1,S2)"]))
    return A


def main():
    init = {
        "ForecastFlood(Z1)",
        "BridgeAtRisk(B3)",
        "AltRouteClear(R2)",
        "KitsAtDepot(D1)",
        "WeatherWindowOpen"
    }
    goals = {"EvacuatedTo(Z1,S2)", "ShelterStocked(S2)"}

    actions = domain_actions()
    pg = PlanningGraph(actions, init, goals, max_levels=10)

    level = pg.expand_until_goals()
    if level is None:
        print("No plan found.")
        return

    plan = pg.extract_plan(level)
    if plan is None:
        print("Failed to extract plan.")
        return

    print(f"Plan found in {len(plan)} steps (S-levels used: {level})")
    for t, acts in enumerate(plan):
        print(f"t{t}: " + " || ".join(a.name for a in acts if not a.name.startswith("Persist(")))

    dot_file = f"flood_5actions_L{level}.dot"
    pg.to_dot(level, dot_file, plan)
    print(f"DOT written to {dot_file}")

    if shutil.which("dot"):
        png = f"flood_5actions_L{level}.png"
        os.system(f'dot -Tpng "{dot_file}" -o "{png}"')
        print(f"PNG written to {png}")
    else:
        print("Install graphviz (dot) to render PNG.")


In [None]:
def main():
    init = {
        "ForecastFlood(Z1)",
        "BridgeAtRisk(B3)",
        "AltRouteClear(R2)",
        "KitsAtDepot(D1)",
        "WeatherWindowOpen"
    }
    goals = {"EvacuatedTo(Z1,S2)", "ShelterStocked(S2)"}

    actions = domain_actions()
    pg = PlanningGraph(actions, init, goals, max_levels=10)

    level = pg.expand_until_goals()
    print("Plan level:", level)

    plan = pg.extract_plan(level)

    
    dot_name = f"custom_plan_L{level}.dot"
    png_name = f"custom_plan_L{level}.png"
    

    pg.to_dot(level, dot_name, plan)
    print(f"DOT saved as: {dot_name}")

    if shutil.which("dot"):
        os.system(f'dot -Tpng "{dot_name}" -o "{png_name}"')
        print(f"PNG saved as: {png_name}")
    else:
        print("Install graphviz to render PNG.")


In [8]:
main()

Plan level: 3
DOT saved as: custom_plan_L3.dot
PNG saved as: custom_plan_L3.png
