<a href="https://colab.research.google.com/github/Dulyaaa/Wumpus-World/blob/main/Strict_Agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Provided Baseline & Conventions for Part 1

**Baseline (implemented below):**
- A world generator that samples grids where a **safe route** `Start → Gold → Exit` exists (this is only a sampling guarantee; the agent does **not** know it).
- Percepts are breeze and stench. Read the code to see how these percepts are turned into facts and used for reasoning.
- The agent maintains a **knowledge base** and queries for inference; it then plans to explore or to exit after grabbing the gold.

**Symbols & coordinates:**
- Grid is `n × n`. Start = `(1, 1)`, Exit = `(n, n)`.
- One Gold.
- Printed map (instructor view): `S` = start, `X` = exit, `G` = gold, `P` = pit, `W` = wumpus.
- Agent’s percept view shows `.(B)` = breeze, `.(S)` = stench, or `.(BS)` = breeze & stech at cells (glitter omitted).


#### Setup

In [1]:
!pip -q install python-sat

from collections import deque
from typing import Dict, Tuple, List, Set, Optional
import random

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.3/2.9 MB[0m [31m10.6 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━[0m [32m2.4/2.9 MB[0m [31m28.2 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.9/2.9 MB[0m [31m25.8 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.9/2.9 MB[0m [31m19.9 MB/s[0m eta [36m0:00:00[0m
[?25h

#### World utils: grid ops, generator, percepts, printers

In [2]:
Coord = Tuple[int, int]

def neighbors4(r:int, c:int, n:int):
    if r > 1:   yield (r-1, c)
    if r < n:   yield (r+1, c)
    if c > 1:   yield (r, c-1)
    if c < n:   yield (r, c+1)

def bfs_path_avoid(start: Coord, goal: Coord, n: int, blocked: Set[Coord]) -> Optional[List[Coord]]:
    """BFS path avoiding blocked cells; returns [start,...,goal] or None."""
    if start in blocked or goal in blocked:
        return None
    q = deque([start]); parent = {start: None}
    while q:
        u = q.popleft()
        if u == goal:
            path = []
            cur = u
            while cur is not None:
                path.append(cur)
                cur = parent[cur]
            return list(reversed(path))
        for v in neighbors4(u[0], u[1], n):
            if v not in parent and v not in blocked:
                parent[v] = u
                q.append(v)
    return None

def truth_percepts(world: Dict, pos: Coord) -> Dict[str,bool]:
    """Deterministic percepts at pos: Breeze (adjacent pit), Stench (adjacent wumpus), Glitter (on gold)."""
    n = world["rows"]
    pits = set(map(tuple, world["pits"]))
    wums = set(map(tuple, world["wumpus"]))
    r,c = pos
    adjs = list(neighbors4(r,c,n))
    breeze = any(a in pits for a in adjs)
    stench = any(a in wums for a in adjs)
    glitter = (pos == tuple(world["gold"]))
    return {"breeze": breeze, "stench": stench, "glitter": glitter}

def pretty_print_world(world: Dict):
    """Instructor view: shows S(start), X(exit), G(gold), P(pit), W(wumpus)."""
    n = world["rows"]
    S = tuple(world["start"]); X = tuple(world["exit"]); G = tuple(world["gold"])
    P = set(map(tuple, world["pits"])); W = set(map(tuple, world["wumpus"]))
    print(f"{n}x{n} Base World")
    for r in range(1, n+1):
        row = []
        for c in range(1, n+1):
            cell = '.'
            if (r,c) == S: cell = 'S'
            elif (r,c) == X: cell = 'X'
            elif (r,c) == G: cell = 'G'
            elif (r,c) in P: cell = 'P'
            elif (r,c) in W: cell = 'W'
            row.append(cell)
        print(" ".join(f"{x:>2}" for x in row))

def pretty_print_percepts(world: Dict, agent_pos: Optional[Coord] = None):
    """
    Shows .(B), .(S), .(BS) flags (glitter omitted). Agent shown as A.
    Does NOT reveal hazards themselves (P/W) to the agent.
    """
    n = world["rows"]
    A = agent_pos
    print(f"{n}x{n} World (B=Breeze, S=Stench; Agent=A)")
    for r in range(1, n+1):
        row = []
        for c in range(1, n+1):
            flag = ''
            pt = truth_percepts(world, (r,c))
            if pt["breeze"] and pt["stench"]: flag = '(BS)'
            elif pt["breeze"]: flag = '(B)'
            elif pt["stench"]: flag = '(S)'
            ch = 'A' if A == (r,c) else '.'
            row.append(f"{ch:>2}{flag}")
        print(" ".join(f"{cell:>5}" for cell in row))

def generate_world_percept_friendly(
    n: int,
    num_pits: int,
    num_wumpus: int,
    seed: Optional[int] = None,
    require_start_clear: bool = True,
    min_percept_hits_on_safe_path: int = 1,
    max_tries: int = 10000,
) -> Dict:
    """
    Random world:
      - Start=(1,1), Exit=(n,n), one Gold (not start/exit).
      - Place pits & wumpus on distinct cells not covering start/exit/gold.
      - Guarantee a hazard-free path Start->Gold->Exit exists (instructor check).
      - Optionally require no Breeze/Stench at start and some percepts along a safe path.
    """
    if seed is not None:
        random.seed(seed)

    start, ext = (1,1), (n,n)
    all_cells = [(r,c) for r in range(1,n+1) for c in range(1,n+1)]
    gold_candidates = [cell for cell in all_cells if cell not in (start, ext)]

    for _ in range(max_tries):
        gold = random.choice(gold_candidates)
        forbidden = {start, ext, gold}
        free = [cell for cell in all_cells if cell not in forbidden]
        if num_pits + num_wumpus > len(free):
            raise ValueError("Too many hazards for this grid.")
        hazards = random.sample(free, num_pits + num_wumpus)
        pits = hazards[:num_pits]
        wums = hazards[num_pits:]
        blocked = set(pits) | set(wums)

        p1 = bfs_path_avoid(start, gold, n, blocked)
        if p1 is None: continue
        p2 = bfs_path_avoid(gold, ext, n, blocked)
        if p2 is None: continue
        safe_path = p1[:-1] + p2  # not used directly; just for the percept richness check below

        world = {
            "rows": n, "cols": n,
            "start": [1,1], "exit": [n,n],
            "gold": [gold[0], gold[1]],
            "pits": [list(p) for p in pits],
            "wumpus": [list(w) for w in wums],
        }

        if require_start_clear:
            pt = truth_percepts(world, start)
            if pt["breeze"] or pt["stench"]: continue

        if min_percept_hits_on_safe_path > 0:
            hits = 0
            for cell in safe_path:
                pt = truth_percepts(world, cell)
                hits += int(pt["breeze"] or pt["stench"])
            if hits < min_percept_hits_on_safe_path: continue

        return world

    raise RuntimeError("Failed to generate world; relax constraints or hazards.")

#### Fast SAT KB (PySAT/Glucose) + helpers

In [3]:
from pysat.solvers import Glucose3
from pysat.formula import CNF

class SATKB:
    """
    Incremental CNF KB with variables:
      P_r_c (pit), W_r_c (wumpus), B_r_c (breeze), S_r_c (stench).
    Encodes, for each cell (r,c):
      Breeze_rc -> OR Pit_adjacent;  Pit_adjacent -> Breeze_rc
      Stench_rc -> OR Wumpus_adjacent; Wumpus_adjacent -> Stench_rc
    Percepts (observations) and any remembered hazards are added as unit clauses.
    Entailment uses solver assumptions (fast).
    """
    def __init__(self, n: int, use_uniqueness_wumpus: bool = False):
        self.n = n
        self._vid = {}
        self._next = 1
        def vid(kind, r, c):
            key = (kind, r, c)
            if key not in self._vid:
                self._vid[key] = self._next
                self._next += 1
            return self._vid[key]
        self.vid = vid

        cnf = CNF()
        # Sensor-hazard bi-implications (as pairs of implications)
        for r in range(1, n+1):
            for c in range(1, n+1):
                adjs = list(neighbors4(r, c, n))
                # Breeze -> OR Pit(adj)
                b = vid('B', r, c)
                if adjs:
                    cnf.append([-b] + [vid('P', ar, ac) for (ar, ac) in adjs])
                    # Pit(adj) -> Breeze
                    for (ar, ac) in adjs:
                        cnf.append([-vid('P', ar, ac), b])
                else:
                    cnf.append([-b])
                # Stench -> OR Wumpus(adj)
                s = vid('S', r, c)
                if adjs:
                    cnf.append([-s] + [vid('W', ar, ac) for (ar, ac) in adjs])
                    for (ar, ac) in adjs:
                        cnf.append([-vid('W', ar, ac), s])
                else:
                    cnf.append([-s])

        # Optional: at-most-one wumpus (usually skipped; we allow multiple)
        if use_uniqueness_wumpus:
            wvars = [vid('W', r, c) for r in range(1, n+1) for c in range(1, n+1)]
            for i in range(len(wvars)):
                for j in range(i+1, len(wvars)):
                    cnf.append([-wvars[i], -wvars[j]])

        # Build solver
        self.solver = Glucose3()
        for cl in cnf.clauses:
            self.solver.add_clause(cl)

    # Assertions (facts)
    def assert_breeze(self, r, c, val: bool):
        self.solver.add_clause([ self.vid('B', r, c) if val else -self.vid('B', r, c) ])

    def assert_stench(self, r, c, val: bool):
        self.solver.add_clause([ self.vid('S', r, c) if val else -self.vid('S', r, c) ])

    def assert_pit(self, r, c):        self.solver.add_clause([ self.vid('P', r, c) ])
    def assert_not_pit(self, r, c):    self.solver.add_clause([ -self.vid('P', r, c) ])
    def assert_wumpus(self, r, c):     self.solver.add_clause([ self.vid('W', r, c) ])
    def assert_not_wumpus(self, r, c): self.solver.add_clause([ -self.vid('W', r, c) ])

    # Entailment of literals via assumptions
    def _entails_true(self, var_id: int) -> bool:   # KB ⊨ x  iff KB ∧ ¬x is UNSAT
        return not self.solver.solve(assumptions=[-var_id])

    def _entails_false(self, var_id: int) -> bool:  # KB ⊨ ¬x iff KB ∧  x is UNSAT
        return not self.solver.solve(assumptions=[ var_id])

    def entails_pit(self, r, c) -> bool:         return self._entails_true(self.vid('P', r, c))
    def entails_not_pit(self, r, c) -> bool:     return self._entails_false(self.vid('P', r, c))
    def entails_wumpus(self, r, c) -> bool:      return self._entails_true(self.vid('W', r, c))
    def entails_not_wumpus(self, r, c) -> bool:  return self._entails_false(self.vid('W', r, c))

# Tiny helpers wrapping SATKB
def add_percepts_at_sat(kb: SATKB, pos: Coord, percepts: dict):
    r, c = pos
    kb.assert_breeze(r, c, percepts["breeze"])
    kb.assert_stench(r, c, percepts["stench"])

def is_cell_provably_safe_sat(kb: SATKB, r: int, c: int) -> bool:
    return kb.entails_not_pit(r, c) and kb.entails_not_wumpus(r, c)


#### Agent Program

In [4]:
def bfs_path_in_set(n: int, start: Coord, goal: Coord, allowed: Set[Coord]) -> Optional[List[Coord]]:
    """BFS restricted to 'allowed'; returns [start,...,goal] or None."""
    if start not in allowed or goal not in allowed:
        return None
    q = deque([start]); parent = {start: None}
    while q:
        u = q.popleft()
        if u == goal:
            path = []
            cur = u
            while cur is not None:
                path.append(cur)
                cur = parent[cur]
            return list(reversed(path))
        for v in neighbors4(u[0], u[1], n):
            if v in allowed and v not in parent:
                parent[v] = u
                q.append(v)
    return None

def action_from_step(cur: Coord, nxt: Coord) -> str:
    if nxt == cur: return "NoOp"
    (r,c), (nr,nc) = cur, nxt
    if (nr, nc) == (r-1, c): return "MoveN"
    if (nr, nc) == (r+1, c): return "MoveS"
    if (nr, nc) == (r, c-1): return "MoveW"
    if (nr, nc) == (r, c+1): return "MoveE"
    raise RuntimeError(f"Non-adjacent step from {cur} to {nxt}")

def frontier_neighbors(n: int, visited: Set[Coord]) -> Set[Coord]:
    """Unvisited cells that touch any visited cell."""
    fr = set()
    for (r,c) in visited:
        for (nr,nc) in neighbors4(r,c,n):
            if (nr,nc) not in visited:
                fr.add((nr,nc))
    return fr

def propagate_neg_percepts_sat(kb: SATKB, pos: Coord, percepts: Dict, n: int,
                               no_pit: Set[Coord], no_wump: Set[Coord]):
    """Sound local propagation: ¬Breeze ⇒ neighbors ¬Pit; ¬Stench ⇒ neighbors ¬Wumpus."""
    r,c = pos
    for (nr,nc) in neighbors4(r,c,n):
        if not percepts["breeze"]:
            kb.assert_not_pit(nr,nc);     no_pit.add((nr,nc))
        if not percepts["stench"]:
            kb.assert_not_wumpus(nr,nc);  no_wump.add((nr,nc))

def online_logic_agent_strict_sat(world: Dict, verbose: bool = True, max_steps: int = 500):
    """
    Sound agent (never steps into unknown-risk cells).
    Policy:
      • Add Breeze/Stench facts each step (noise-free).
      • Propagate local negatives (¬B → neighbors ¬Pit; ¬S → neighbors ¬Wumpus).
      • Build traversal set = visited ∪ {cells provably safe}.
      • If not carrying gold: go to nearest provably-safe frontier.
      • If carrying gold: go to exit via provably-safe cells only.
      • Stop if no provably-safe frontier / no safe route to exit.
    """
    n = world["rows"]
    start = tuple(world["start"])
    exit_cell = tuple(world["exit"])
    gold = tuple(world["gold"])

    kb = SATKB(n, use_uniqueness_wumpus=False)

    pos: Coord = start
    visited: Set[Coord] = {pos}
    have_gold = False
    plan_actions: List[str] = []
    trace: List[Coord] = [pos]
    stopped_reason = None

    # sets from local propagation (fast)
    no_pit: Set[Coord] = set()
    no_wump: Set[Coord] = set()

    if verbose:
        print("Initial environment:")
        pretty_print_percepts(world, agent_pos=pos)
        p0 = truth_percepts(world, pos)
        print(f"Percepts at start: Breeze={p0['breeze']}, Stench={p0['stench']}\n")

    steps = 0
    while steps < max_steps:
        steps += 1

        if pos == exit_cell and have_gold:
            if verbose:
                print("Reached exit with gold. Done.")
            break

        # 1) Perceive (glitter omitted from prints/policy)
        obs = truth_percepts(world, pos)

        # 2) Update KB + local neg propagation + visited assumed safe
        add_percepts_at_sat(kb, pos, obs)
        propagate_neg_percepts_sat(kb, pos, obs, n, no_pit, no_wump)

        # 3) Grab if on gold
        if pos == gold and not have_gold:
            have_gold = True
            plan_actions.append("Grab")
            if verbose:
                print(f"Step {steps}: Action=Grab (at {pos})")
                pretty_print_percepts(world, agent_pos=pos)
                print(f"Percepts: Breeze={obs['breeze']}, Stench={obs['stench']}\n")
            continue

        # 4) Build safe traversal set
        safe_cells: Set[Coord] = set(visited) | (no_pit & no_wump)

        # 5) Provably-safe frontier (check only frontiers to keep it fast)
        fr = frontier_neighbors(n, visited)
        provably_safe_fr = set()
        for (r,c) in fr:
            if (r,c) in safe_cells or is_cell_provably_safe_sat(kb, r, c):
                provably_safe_fr.add((r,c))
                safe_cells.add((r,c))  # allow routing through it

        # 6) Choose plan
        if have_gold:
            path = bfs_path_in_set(n, pos, exit_cell, safe_cells)
            if path is None:
                stopped_reason = "No provably-safe path to exit."
                if verbose: print(stopped_reason)
                break
        else:
            if not provably_safe_fr:
                stopped_reason = "No provably-safe frontier to explore."
                if verbose: print(stopped_reason)
                break
            # nearest provably-safe frontier by BFS length through safe_cells
            best_path, best_len = None, None
            for cand in provably_safe_fr:
                p = bfs_path_in_set(n, pos, cand, safe_cells)
                if p is not None and (best_len is None or len(p) < best_len):
                    best_path, best_len = p, len(p)
            if best_path is None:
                stopped_reason = "Provably-safe frontier unreachable."
                if verbose: print(stopped_reason)
                break
            path = best_path

        # 7) Take exactly one safe step
        nxt = path[1] if len(path) >= 2 else path[0]
        if nxt == pos:
            if verbose:
                print(f"Step {steps}: No-op at {pos}\n")
            continue
        act = action_from_step(pos, nxt)

        plan_actions.append(act)
        pos = nxt
        visited.add(pos)
        trace.append(pos)

        if verbose:
            print(f"Step {steps}: Action={act} -> pos={pos}")
            pretty_print_percepts(world, agent_pos=pos)
            po = truth_percepts(world, pos)
            print(f"Percepts: Breeze={po['breeze']}, Stench={po['stench']}\n")

    return {
        "plan": plan_actions,
        "trace": trace,
        "have_gold": have_gold,
        "success": (pos == exit_cell and have_gold),
        "stopped_reason": stopped_reason
    }

#### Rejection-sample a solvable world (strict agent) and run

In [5]:
def sample_solvable_world_strict(
    n: int, num_pits: int, num_wumpus: int,
    seed: Optional[int] = None,
    require_start_clear: bool = True,
    min_percept_hits_on_safe_path: int = 2,
    max_outer_tries: int = 500,
    max_agent_steps: int = 600,
    verbose: bool = False
) -> Dict:
    """
    Keep drawing worlds until the STRICT agent can solve them with proofs only.
    """
    rng = random.Random(seed)
    tries = 0
    while tries < max_outer_tries:
        tries += 1
        world = generate_world_percept_friendly(
            n=n, num_pits=num_pits, num_wumpus=num_wumpus,
            seed=rng.randint(0, 10_000_000),
            require_start_clear=require_start_clear,
            min_percept_hits_on_safe_path=min_percept_hits_on_safe_path
        )
        res = online_logic_agent_strict_sat(world, verbose=False, max_steps=max_agent_steps)
        if res["success"]:
            if verbose:
                print(f"Found solvable world in {tries} draws.")
            return world
    raise RuntimeError("Could not find a provably-solvable world. Relax constraints/hazards.")

# ---- Demo: pick a solvable world and run with per-step prints ----
n = 7
num_pits = 3
num_wumpus = 2
seed = 42

world_pf = sample_solvable_world_strict(
    n=n, num_pits=num_pits, num_wumpus=num_wumpus,
    seed=seed, require_start_clear=True, min_percept_hits_on_safe_path=2,
    max_outer_tries=500, max_agent_steps=800, verbose=True
)

print("Base world (instructor view):")
pretty_print_world(world_pf)
print()
print("Initial world with Breeze/Stench flags (glitter omitted):")
pretty_print_percepts(world_pf, agent_pos=tuple(world_pf["start"]))
print()

res = online_logic_agent_strict_sat(world_pf, verbose=True, max_steps=800)

print("\nFinal summary")
print("Plan:", res["plan"])
print("Trace:", res["trace"])
print("Success:", res["success"], "HaveGold:", res["have_gold"])
print("StoppedReason:", res["stopped_reason"])

Found solvable world in 18 draws.
Base world (instructor view):
7x7 Base World
 S  .  .  .  .  .  .
 .  .  .  .  .  W  .
 .  W  .  .  .  P  .
 .  .  .  .  .  .  .
 .  .  .  .  .  G  .
 .  P  .  .  .  .  .
 .  .  P  .  .  .  X

Initial world with Breeze/Stench flags (glitter omitted):
7x7 World (B=Breeze, S=Stench; Agent=A)
    A     .     .     .     .  .(S)     .
    .  .(S)     .     .  .(S)  .(B)  .(S)
 .(S)     .  .(S)     .  .(B)  .(S)  .(B)
    .  .(S)     .     .     .  .(B)     .
    .  .(B)     .     .     .     .     .
 .(B)     .  .(B)     .     .     .     .
    .  .(B)     .  .(B)     .     .     .

Initial environment:
7x7 World (B=Breeze, S=Stench; Agent=A)
    A     .     .     .     .  .(S)     .
    .  .(S)     .     .  .(S)  .(B)  .(S)
 .(S)     .  .(S)     .  .(B)  .(S)  .(B)
    .  .(S)     .     .     .  .(B)     .
    .  .(B)     .     .     .     .     .
 .(B)     .  .(B)     .     .     .     .
    .  .(B)     .  .(B)     .     .     .
Percepts at start: Breeze

#### Failling case of Strict Agent

In [10]:
def sample_solvable_world_strict(
    n: int, num_pits: int, num_wumpus: int,
    seed: Optional[int] = None,
    require_start_clear: bool = True,
    min_percept_hits_on_safe_path: int = 2,
    max_outer_tries: int = 500,
    max_agent_steps: int = 600,
    verbose: bool = False
) -> Dict:
    """
    Keep drawing worlds until the STRICT agent can solve them with proofs only.
    """
    rng = random.Random(seed)
    tries = 0
    while tries < max_outer_tries:
        tries += 1
        world = generate_world_percept_friendly(
            n=n, num_pits=num_pits, num_wumpus=num_wumpus,
            seed=rng.randint(0, 10_000_000),
            require_start_clear=require_start_clear,
            min_percept_hits_on_safe_path=min_percept_hits_on_safe_path
        )
        res = online_logic_agent_strict_sat(world, verbose=False, max_steps=max_agent_steps)
        if res["success"]:
            if verbose:
                print(f"Found solvable world in {tries} draws.")
            return world
    raise RuntimeError("Could not find a provably-solvable world. Relax constraints/hazards.")

# ---- Demo: pick a solvable world and run with per-step prints ----
n = 7
num_pits = 8
num_wumpus = 7
seed = 42

world_pf = sample_solvable_world_strict(
    n=n, num_pits=num_pits, num_wumpus=num_wumpus,
    seed=seed, require_start_clear=True, min_percept_hits_on_safe_path=2,
    max_outer_tries=500, max_agent_steps=800, verbose=True
)

print("Base world (instructor view):")
pretty_print_world(world_pf)
print()
print("Initial world with Breeze/Stench flags (glitter omitted):")
pretty_print_percepts(world_pf, agent_pos=tuple(world_pf["start"]))
print()

res = online_logic_agent_strict_sat(world_pf, verbose=True, max_steps=800)

print("\nFinal summary")
print("Plan:", res["plan"])
print("Trace:", res["trace"])
print("Success:", res["success"], "HaveGold:", res["have_gold"])
print("StoppedReason:", res["stopped_reason"])

RuntimeError: Could not find a provably-solvable world. Relax constraints/hazards.