In [None]:
import httpx
from pydantic import BaseModel

c = httpx.Client()

team_id = ""
base_url = "https://31pwr5t6ij.execute-api.eu-west-2.amazonaws.com"

problems = [
    "probatio",
    "primus",
    "secundus",
    "tertius",
    "quartus",
    "quintus"
]

def select(problemName: str):
    return c.post(f"{base_url}/select", json={
        "id": team_id,
        "problemName": problemName
    })

def explore(plans: list[str]) -> list[list[int]]:
    r = c.post(f"{base_url}/explore", json={
        "id": team_id,
        "plans": plans,
    })
    if r.status_code != 200:
        print(r.text)
        r.raise_for_status()
    return (r).json()["results"]

class V(BaseModel):
    room: int
    door: int

def guess(rooms: list[int], startingRoom: int, connections: list[tuple[V,V]]):
    print(connections)
    return c.post(f"{base_url}/guess", json={
        "id": team_id,
        "map": {
            "rooms": rooms,
            "startingRoom": startingRoom,
            "connections": [
                {"from": v1.model_dump_json(), "to": v2.model_dump_json()} for (v1,v2) in connections
            ]
        }
    })

# select(problems[0])
# results = explore(plans="".join(["0" for _ in range(54)]))
# print(results)
# rooms = list(set(results))
# print("rooms=", rooms)
# count_map = {
#     room: len([r for r in results if r == room])
#     for room in rooms
# }
# print(count_map)

In [None]:
import requests
import random
import time
import json
from itertools import product

# --- 1. API Client ---
# サーバーとの通信を管理するクラス
class AedificiumClient:
    """
    ICFP 2025 "The Name of the Binding" API Client.
    """
    def __init__(self, base_url="https://31pwr5t6ij.execute-api.eu-west-2.amazonaws.com"):
        self.base_url = base_url
        self.team_id = None
        self.query_count = 0

    
    def select(self, problem_name):
        """Select a problem to solve."""
        if not self.team_id:
            raise Exception("Must register first.")
        payload = {"id": self.team_id, "problemName": problem_name}
        response = requests.post(f"{self.base_url}/select", json=payload)
        response.raise_for_status()
        print(f"Problem '{problem_name}' selected.")
        self.query_count = 0 # Reset query count for a new problem

    def explore(self, plans):
        """Explore the library with a batch of plans."""
        if not self.team_id:
            raise Exception("Must register and select a problem first.")
        if not plans:
            return {}, 0
        
        payload = {"id": self.team_id, "plans": plans}
        response = requests.post(f"{self.base_url}/explore", json=payload)
        response.raise_for_status()
        data = response.json()
        
        self.query_count = data['queryCount']
        print(f"Explored {len(plans)} plans. Total query count: {self.query_count}")
        
        # 結果を plan -> result の辞書形式で返す
        return {plan: result for plan, result in zip(plans, data['results'])}, self.query_count

    def guess(self, map_data):
        print(map_data)
        payload = {"id": self.team_id, "map": map_data}
        print("Submitting guess...")
        # print(json.dumps(payload, indent=2)) # For debugging the map format
        response = requests.post(f"{self.base_url}/guess", json=payload)
        response.raise_for_status()
        data = response.json()
        print(f"Guess result: {'Correct!' if data['correct'] else 'Incorrect.'}")
        return data['correct']


# --- 2. L* Solver ---
# Mooreオートマトンを学習するアルゴリズムの本体
class LStarMooreSolver:
    """
    Solves the labyrinth mapping problem using a variant of the L* algorithm
    for Moore machines.
    """
    def __init__(self, client):
        self.client = client
        self.alphabet = [str(i) for i in range(6)]
        self.S = set()  # State prefixes
        self.E = set()  # Experiment suffixes
        self.T = {}     # Observation table: T[prefix] = {suffix: result_trace}

    def _get_row(self, prefix):
        """
        Generates a tuple representing the row for a given prefix.
        The row consists of the *final* observation for each experiment.
        """
        # Eをソートして、行の順序を常に一定に保つ
        sorted_e = sorted(list(self.E))
        
        # 各実験(prefix + suffix)の最後のラベルを取得
        row_values = []
        for e in sorted_e:
            full_plan = prefix + e
            # 観測結果の最後の要素がその状態の出力
            row_values.append(self.T[full_plan][-1])
            
        return tuple(row_values)

    def _fill_observation_table(self):
        """
        Finds all missing entries in the table and fills them using a single
        batched API call.
        """
        prefixes_to_query = self.S | {s + a for s in self.S for a in self.alphabet}
        plans_to_explore = []
        for p in prefixes_to_query:
            for e in self.E:
                plan = p + e
                if plan not in self.T:
                    plans_to_explore.append(plan)
        
        # 重複を削除し、空文字列はそのまま "" として扱う
        plans_to_explore = sorted(list(set(plans_to_explore)))

        if plans_to_explore:
            results, _ = self.client.explore(plans_to_explore)
            self.T.update(results)
            # 開始部屋のラベルを取得するために、空のプランの結果が必要
            if "" not in self.T and "" in plans_to_explore:
                 self.T[""] = self.client.explore([""])['results'][0]
                 # NOTE: The problem states a plan of length x gives x results.
                 # An empty plan "" has length 0, so the result should be [].
                 # We need the starting room's label. Let's explore plan "0"
                 # and take the first label. A bit of a hack.
            if not self.T.get(""):
                res_0 = self.client.explore(["0"])[0]["0"]
                self.T[""] = [res_0[0]] # Starting room label


    def _check_and_fix(self):
        """
        Checks if the table is closed and consistent. If not, fixes it by
        adding prefixes to S or suffixes to E. Returns True if stable.
        """
        # 1. Closed Check
        s_rows = {self._get_row(s): s for s in self.S}
        
        for s in self.S:
            for a in self.alphabet:
                sa_prefix = s + a
                sa_row = self._get_row(sa_prefix)
                if sa_row not in s_rows:
                    print(f"Table not closed. New state found for prefix '{sa_prefix}'. Adding to S.")
                    self.S.add(sa_prefix)
                    return False # Table modified, restart check

        # 2. Consistency Check
        s_list = sorted(list(self.S))
        for i in range(len(s_list)):
            for j in range(i + 1, len(s_list)):
                s1, s2 = s_list[i], s_list[j]
                if self._get_row(s1) == self._get_row(s2):
                    for a in self.alphabet:
                        if self._get_row(s1 + a) != self._get_row(s2 + a):
                            print(f"Table not consistent for '{s1}' and '{s2}' on symbol '{a}'.")
                            # 矛盾を区別するための新しい実験(suffix)を見つける
                            row1 = self._get_row(s1 + a)
                            row2 = self._get_row(s2 + a)
                            sorted_e = sorted(list(self.E))
                            for k in range(len(sorted_e)):
                                if row1[k] != row2[k]:
                                    new_suffix = a + sorted_e[k]
                                    print(f"Adding new suffix '{new_suffix}' to E.")
                                    self.E.add(new_suffix)
                                    return False # Table modified

        return True # Table is closed and consistent

    
    def _build_hypothesis(self):
        """
        Builds a map (Moore machine) from the stable observation table.
        """
        print("Building hypothesis map...")
        s_list_sorted = sorted(list(self.S))
        row_to_idx = {self._get_row(s): i for i, s in enumerate(s_list_sorted)}
        
        # Sにない遷移先の行もインデックスに含める必要がある
        all_prefixes = self.S | {s + a for s in self.S for a in self.alphabet}
        all_rows = {self._get_row(p) for p in all_prefixes}
        
        # 安定した状態（Sの行）にまずインデックスを振る
        state_rows = {self._get_row(s) for s in self.S}
        sorted_state_rows = sorted(list(state_rows))
        
        row_to_idx = {row: i for i, row in enumerate(sorted_state_rows)}
        idx_to_row = {i: row for i, row in enumerate(sorted_state_rows)}
        
        # 部屋のラベルを取得 (空のsuffixに対応する列を見る)
        try:
            empty_suffix_idx = sorted(list(self.E)).index("")
        except ValueError:
            # Eに""がないケースは考えにくいが念のため
            raise Exception("Observation table is not correctly initialized: empty suffix '' is missing in E.")

        rooms = [row[empty_suffix_idx] for i, row in sorted(idx_to_row.items())]

        starting_room_idx = row_to_idx[self._get_row("")]
        
        connections = []
        # 無向グラフなので、(room1, room2) のペアを保存して重複を防ぐ
        # room1 < room2 とする
        processed_connections = set()

        # 1. まず、一方向の接続を特定する
        # (from_room, from_door) -> to_room
        directed_transitions = {}
        for s in self.S:
            from_row = self._get_row(s)
            if from_row not in row_to_idx: continue # Sの代表元でない場合はスキップ

            from_idx = row_to_idx[from_row]
            if from_idx not in directed_transitions:
                directed_transitions[from_idx] = {}

            for door, a in enumerate(self.alphabet):
                to_row = self._get_row(s + a)
                if to_row not in row_to_idx:
                    # これはL*の仮説が不完全な場合に起こりうる
                    # 例えば、遷移先がまだSの代表元になっていない場合
                    # print(f"Warning: transition from state {from_idx} via door {door} leads to an unknown state row.")
                    continue
                to_idx = row_to_idx[to_row]
                directed_transitions[from_idx][door] = to_idx

        # 2. 逆方向の接続 (to_door) を見つけてペアにする
        for from_idx, transitions in directed_transitions.items():
            for from_door, to_idx in transitions.items():
                
                # 接続キーを正規化 (小さい方のインデックスを先にする)
                conn_key = tuple(sorted((from_idx, to_idx)))
                if conn_key in processed_connections:
                    continue

                # to_idx から from_idx に戻るドアを探す
                found_to_door = False
                if to_idx in directed_transitions:
                    for to_door, return_idx in directed_transitions[to_idx].items():
                        if return_idx == from_idx:
                            connections.append({
                                "from": {"room": from_idx, "door": from_door},
                                "to": {"room": to_idx, "door": to_door}
                            })
                            # この部屋ペアの接続は処理済みとする
                            processed_connections.add(conn_key)
                            found_to_door = True
                            # 1つの接続ペアには複数の経路がありうるが、最初に見つかったものを採用する
                            # (例: Aのドア1->B, Aのドア2->B)。この問題ではそれは無いはず。
                            break 
                
                if not found_to_door:
                    # print(f"Warning: Could not find return door for connection {from_idx}:{from_door} -> {to_idx}")
                    pass

        return {
            "rooms": rooms,
            "startingRoom": starting_room_idx,
            "connections": connections
        }

    def _find_counterexample(self, hypothesis, max_attempts=100, plan_length=20):
        """
        Tries to find a counterexample by comparing the hypothesis model's
        output with the real server's output for random plans.
        """
        print("Searching for a counterexample...")
        
        # 仮説モデルのシミュレータ
        def simulate(hypo, plan):
            # 接続を高速にルックアップできる形式に変換
            conn_map = {}
            for conn in hypo['connections']:
                r1, d1 = conn['from']['room'], conn['from']['door']
                r2, d2 = conn['to']['room'], conn['to']['door']
                if r1 not in conn_map: conn_map[r1] = {}
                if r2 not in conn_map: conn_map[r2] = {}
                conn_map[r1][d1] = r2
                conn_map[r2][d2] = r1

            trace = []
            current_room = hypo['startingRoom']
            trace.append(hypo['rooms'][current_room])
            
            for move in plan:
                door = int(move)
                if current_room in conn_map and door in conn_map[current_room]:
                    current_room = conn_map[current_room][door]
                    trace.append(hypo['rooms'][current_room])
                else: # モデルにない遷移
                    return None
            return trace[1:] # Adso's record does not include the starting room label

        for _ in range(max_attempts):
            length = random.randint(len(self.S) * 2, plan_length)
            plan = "".join(random.choices(self.alphabet, k=length))
            
            predicted_trace = simulate(hypothesis, plan)
            if predicted_trace is None:
                continue

            real_trace, _ = self.client.explore([plan])
            if real_trace[plan] != predicted_trace:
                print(f"Counterexample found: '{plan}'")
                return plan
        
        print("No counterexample found with random walks.")
        return None

    def solve(self, problem_name):
        """
        Main solving loop.
        """
        self.client.select(problem_name)

        # 初期化
        self.S = {""}
        # 初期実験セット。空文字列は現在の状態のラベル、1文字は遷移先の状態を区別するため。
        self.E = {""} | set(self.alphabet)
        self.T = {}
        
        while True:
            # 1. 表が安定するまで拡張する
            is_stable = False
            while not is_stable:
                self._fill_observation_table()
                is_stable = self._check_and_fix()
            
            # 2. 仮説を構築する
            hypothesis = self._build_hypothesis()
            
            # 3. 推測を提出する
            is_correct = self.client.guess(hypothesis)
            
            if is_correct:
                print("Map successfully constructed!")
                break
            
            # 4. 間違っていた場合、反例を探す
            counterexample = self._find_counterexample(hypothesis, plan_length=len(self.S) * len(self.alphabet) * 2)

            if counterexample is None:
                print("Could not find a counterexample. Assuming the map is correct but maybe isomorphic.")
                # 異なる同型の地図の可能性があるが、これ以上進めないので終了
                break

            # 5. 反例を学習データに追加してループを続ける
            print(f"Adding counterexample '{counterexample}' and its prefixes to S.")
            for i in range(len(counterexample) + 1):
                self.S.add(counterexample[:i])


# --- 3. Main Execution ---
if __name__ == '__main__':
    # --- 設定 ---
    TEAM_NAME = "Your Team Name"
    PROGRAMMING_LANGUAGE = "Python"
    EMAIL = "your.email@example.com"
    PROBLEM_TO_SOLVE = "probatio"  # "probatio" or other problems from the leaderboard

    # --- 実行 ---
    client = AedificiumClient()
    client.team_id = team_id

    solver = LStarMooreSolver(client)
    
    solver.solve(PROBLEM_TO_SOLVE)


Problem 'probatio' selected.
Explored 43 plans. Total query count: 44
Table not closed. New state found for prefix '2'. Adding to S.
Explored 36 plans. Total query count: 81
Table not closed. New state found for prefix '4'. Adding to S.
Explored 36 plans. Total query count: 118
Building hypothesis map...
{'rooms': [0, 1, 2], 'startingRoom': 0, 'connections': [{'from': {'room': 0, 'door': 0}, 'to': {'room': 0, 'door': 0}}, {'from': {'room': 0, 'door': 2}, 'to': {'room': 1, 'door': 0}}, {'from': {'room': 0, 'door': 4}, 'to': {'room': 2, 'door': 3}}, {'from': {'room': 1, 'door': 2}, 'to': {'room': 2, 'door': 0}}, {'from': {'room': 2, 'door': 2}, 'to': {'room': 2, 'door': 2}}]}
Submitting guess...


HTTPError: 400 Client Error: Bad Request for url: https://31pwr5t6ij.execute-api.eu-west-2.amazonaws.com/guess

In [None]:
from aalpy.utils import load_automaton_from_file, generate_random_deterministic_automata
from aalpy.SULs import AutomatonSUL
from aalpy.oracles import RandomWalkEqOracle
from aalpy.learning_algs import run_Lstar, run_KV

# load an automaton
# automaton = load_automaton_from_file('path_to_the_file.dot', automaton_type='dfa')

# or randomly generate one
random_dfa = generate_random_deterministic_automata(automaton_type='moore', num_states=8, 
                                                    input_alphabet_size=5, output_alphabet_size=3)

# get input alphabet of the automaton
alphabet = random_dfa.get_input_alphabet()

# loaded or randomly generated automata are considered as BLACK-BOX that is queried
# learning algorithm has no knowledge about its structure
# create a SUL instance for the automaton/system under learning
sul = AutomatonSUL(random_dfa)

# define the equivalence oracle
eq_oracle = RandomWalkEqOracle(alphabet, sul, num_steps=5000, reset_prob=0.09)

# start learning
# run_KV is for the most part reacquires much fewer interactions with the system under learning
learned_dfa = run_KV(alphabet, sul, eq_oracle, automaton_type='moore')
# or run L*
# learned_dfa_lstar = run_Lstar(alphabet, sul, eq_oracle, automaton_type='dfa')

# save automaton to file and visualize it
# save_automaton_to_file(learned_dfa, path='Learned_Automaton', file_type='dot')
# or
learned_dfa.save()

# visualize automaton
# visualize_automaton(learned_dfa)
learned_dfa.visualize()
# or just print its DOT representation
print(learned_dfa)

word ()
['o3']
word ()
['o3']
word ('i1',)
['o1']
word ('i2',)
['o1']
word ('i3',)
['o2']
word ('i4',)
['o1']
word ('i5', 'i1')
['o1', 'o1']
word ('i5', 'i2')
['o1', 'o3']
word ('i5', 'i3')
['o1', 'o2']
word ('i5', 'i4')
['o1', 'o2']
word ('i5', 'i5')
['o1', 'o1']
word ('i3', 'i1')
['o2', 'o2']
word ('i3', 'i2')
['o2', 'o1']
word ('i3', 'i3')
['o2', 'o1']
word ('i3', 'i4')
['o2', 'o2']
word ('i3', 'i5')
['o2', 'o1']
Hypothesis 1: 3 states.word ('i1', 'i2')
['o1', 'o3']
word ('i4', 'i2')
['o1', 'o2']
word ('i5', 'i1', 'i2')
['o1', 'o1', 'o1']
word ('i5', 'i5', 'i2')
['o1', 'o1', 'o2']
word ('i3', 'i2', 'i2')
['o2', 'o1', 'o1']
word ('i3', 'i3', 'i2')
['o2', 'o1', 'o1']
word ('i3', 'i5', 'i2')
['o2', 'o1', 'o3']
word ('i2', 'i1')
['o1', 'o3']
word ('i2', 'i2', 'i2')
['o1', 'o1', 'o1']
word ('i2', 'i3')
['o1', 'o1']
word ('i2', 'i3', 'i2')
['o1', 'o1', 'o1']
word ('i2', 'i4')
['o1', 'o2']
word ('i2', 'i5')
['o1', 'o2']
word ('i4', 'i1')
['o1', 'o1']
word ('i4', 'i1', 'i2')
['o1', 'o1', 'o

Model saved to LearnedModel.pdf.


In [39]:
import httpx as requests
from aalpy.SULs import SUL

class IcfpSul(SUL):
    steps = ""
    def __init__(self, team_id: str, api_url: str, problem_name: str):
        super().__init__()
        self.team_id = team_id
        self.api_url = api_url
        self.session = requests.Client()
        self.problem_name = problem_name
        self.current_plan = None
        self.current_step_index = 0
        self.current_results = None
        select(problem_name)

    def query(self, word: tuple) -> list[str]:
        word = self.steps + "".join(list(word))[1:]
        print(f"SUL query called with word: {word}")
        result = [str(a) for a in explore(plans = [word])[0]][len(self.steps):]
        print(result)
        return result

    # The `pre`, `post`, and `step` methods are tricky because the API is not stateful
    # or single-step. We have to *simulate* this behavior for the learning algorithm.
    
    def pre(self):
        print("SUL pre called")
        self.steps = ""
        pass

    def post(self):
        print("SUL post called")
        self.string = ""
        pass

    def step(self, letter):
        print(f"SUL step called {letter}")
        self.steps += letter

        print(f"SUL query called with word: {self.steps}")
        result = [str(a) for a in explore(plans = [self.steps])[0]]
        print(result)
        return result[-1]
        

from aalpy.oracles import RandomWalkEqOracle
from aalpy.learning_algs import run_KV, run_Lstar, run_Lsharp

n = 6
sul = IcfpSul(team_id=team_id, api_url=base_url, problem_name=problems[0])
alphabet = ["0", "1", "2", "3", "4", "5"]
print(f'Input alphabet: {alphabet}')

# define the equivalence oracle
eq_oracle = RandomWalkEqOracle(alphabet, sul, num_steps=n * 18, reset_prob=0.09)

# start learning
# run_KV is for the most part reacquires much fewer interactions with the system under learning
learned_dfa = run_Lsharp(alphabet, sul, eq_oracle, automaton_type='moore')

# save automaton to file and visualize it
# save_automaton_to_file(learned_dfa, path='Learned_Automaton', file_type='dot')
# or
learned_dfa.save()

# visualize automaton
# visualize_automaton(learned_dfa)
#learned_dfa.visualize()
# or just print its DOT representation
print(learned_dfa)

Input alphabet: ['0', '1', '2', '3', '4', '5']
SUL query called with word: 
['0']
SUL query called with word: 
['0']
SUL query called with word: 
['0']
SUL query called with word: 
['0']
SUL query called with word: 
['0']
SUL query called with word: 
['0']
SUL query called with word: 
['0']
Hypothesis 1: 1 states.
SUL post called
SUL pre called
SUL step called 2
SUL query called with word: 2
['0', '0']
SUL step called 3
SUL query called with word: 23
['0', '0', '2']
SUL post called
SUL query called with word: 2303
['2', '2', '2']


SystemExit: Non-determinism detected.
Error inserting: ('2',)
Conflict detected: 0 vs 2
Expected Output: ('0',)
Received output: ('2',)

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


Input alphabet: [0, 1, 2, 3, 4, 5]
SUL query called with word: ()
{"error":"Error: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)\n at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 74] (through reference chain: org.icfpconference.ExploreRequest[\"plans\"]->java.util.ArrayList[0])"}


HTTPStatusError: Server error '500 Internal Server Error' for url 'https://31pwr5t6ij.execute-api.eu-west-2.amazonaws.com/explore'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500