# LLM Engine

In [18]:
!pip -q install --upgrade transformers accelerate bitsandbytes einops

In [19]:
import json
import re
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, TextStreamer

In [20]:
MODEL_ID = "Qwen/Qwen3-8B"

In [21]:
# Configuración de cuantización
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4",
)

tokenizer = AutoTokenizer.from_pretrained(
    MODEL_ID,
    use_fast=True,
    trust_remote_code=True
)

model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    quantization_config=bnb_config,
    device_map="auto",
    dtype=torch.float16,
    trust_remote_code=True
)
model.eval()

#                                  -------------------------------
#           ---------             | --------                  --- |            -----------
# texto -> |Tokenizer| - > IDs -> ||Codebook| -> Vectores -> |LLM|| -> IDs -> |unTokenizer| -> Texto (output)
#           ---------             | --------                  --- |            -----------
#                                  -------------------------------

Loading weights:   0%|          | 0/399 [00:00<?, ?it/s]

Qwen3ForCausalLM(
  (model): Qwen3Model(
    (embed_tokens): Embedding(151936, 4096)
    (layers): ModuleList(
      (0-35): 36 x Qwen3DecoderLayer(
        (self_attn): Qwen3Attention(
          (q_proj): Linear4bit(in_features=4096, out_features=4096, bias=False)
          (k_proj): Linear4bit(in_features=4096, out_features=1024, bias=False)
          (v_proj): Linear4bit(in_features=4096, out_features=1024, bias=False)
          (o_proj): Linear4bit(in_features=4096, out_features=4096, bias=False)
          (q_norm): Qwen3RMSNorm((128,), eps=1e-06)
          (k_norm): Qwen3RMSNorm((128,), eps=1e-06)
        )
        (mlp): Qwen3MLP(
          (gate_proj): Linear4bit(in_features=4096, out_features=12288, bias=False)
          (up_proj): Linear4bit(in_features=4096, out_features=12288, bias=False)
          (down_proj): Linear4bit(in_features=12288, out_features=4096, bias=False)
          (act_fn): SiLUActivation()
        )
        (input_layernorm): Qwen3RMSNorm((4096,), eps=1e-06

In [22]:
def qwen( prompt: str,
          system: str = "You are a logical planning assitant",
          max_new_tokens: int=512) -> str:

    # Alta temperatura: Creativo (alucinar)
    #                   Las probabilidades seran homogeneas
    # Menor temperatura: Más determinista
    #                    Se aproxima a un one-hot

    # enable_thinking: Habilita el modo de pensamiento
    # do_sample: Muestre aleatorios entre tokens más probables
    # top-p: Los token más probables hasta tener p de probabilidad
    # top-k: Seleccionamos los k tokens mas probables
    # stream: Controla si los tokens se van generando en tiempo real

    messages = [
        {"role": "system", "content": system},
        {"role": "user", "content": prompt}
    ]

    # ID de tokens (respuesta)
    text = tokenizer.apply_chat_template(
        messages,
        add_generation_prompt=True,
        tokenize=False,
        enable_thinking=False
    )
    # Traducimos los Ids como texto
    inputs = tokenizer([text], return_tensors="pt").to(model.device)

    # Parámetros estrictos para tareas lógicas
    gen_kwargs = dict(
        max_new_tokens=max_new_tokens,
        do_sample=False,      # Apagar muestreo aleatorio para respuestas deterministas
    )

    with torch.no_grad():
        # resp = [input, output]
        out = model.generate(**inputs, **gen_kwargs)

    # Sólo la parte nueva:
    gen_ids = out[0, inputs.input_ids.shape[1]:]
    return tokenizer.decode(gen_ids, skip_special_tokens=True)

# Evaluator

In [23]:
def limpiar_accion(accion_texto):
    texto = accion_texto.replace('(', '').replace(')', '')
    return texto.strip().lower()

def calcular_score_plan(plan_generado, plan_optimo):
    P = [limpiar_accion(p) for p in plan_generado if p.strip()]
    G = [limpiar_accion(p) for p in plan_optimo if p.strip()]

    L_P = len(P)
    L_G = len(G)

    if L_P == 0:
        return 0.0

    score_horizonte = 2.0 if L_P == L_G else 0.0

    l_match = 0
    for p_accion, g_accion in zip(P, G):
        if p_accion == g_accion:
            l_match += 1
        else:
            break

    score_progreso = 3.0 * (l_match / L_G)
    score_exacto = 5.0 if (l_match == L_G and L_P == L_G) else 0.0

    return round(score_horizonte + score_progreso + score_exacto, 2)

# Student Agent

In [27]:
import re
from collections import deque

class AssemblyAgent:
    def __init__(self):
        self.system_prompt = (
            "You are an expert deterministic planner for a toy planning domain. "
            "Return only valid plan actions in canonical PDDL-like format, one per line."
        )
        self._patterns = [
            (
                re.compile(r"\battack\s+(?:object\s+)?([a-z])\b", re.IGNORECASE),
                lambda m: f"(attack {m.group(1).lower()})",
            ),
            (
                re.compile(r"\bsuccumb\s+(?:object\s+)?([a-z])\b", re.IGNORECASE),
                lambda m: f"(succumb {m.group(1).lower()})",
            ),
            (
                re.compile(
                    r"\bfeast\s+(?:object\s+)?([a-z])\s+(?:from\s+(?:object\s+)?)?([a-z])\b",
                    re.IGNORECASE,
                ),
                lambda m: f"(feast {m.group(1).lower()} {m.group(2).lower()})",
            ),
            (
                re.compile(
                    r"\bovercome\s+(?:object\s+)?([a-z])\s+(?:from\s+(?:object\s+)?)?([a-z])\b",
                    re.IGNORECASE,
                ),
                lambda m: f"(overcome {m.group(1).lower()} {m.group(2).lower()})",
            ),
        ]

    def _split_facts(self, text: str) -> list[str]:
        cleaned = text.strip().lower().rstrip(".")
        parts = re.split(r",|\band\b", cleaned)
        return [part.strip() for part in parts if part.strip()]

    def _extract_final_statement(self, scenario_context: str) -> str:
        parts = scenario_context.split("[STATEMENT]")
        return parts[-1] if len(parts) > 1 else scenario_context

    def _parse_problem(self, scenario_context: str):
        statement = self._extract_final_statement(scenario_context)
        lines = [line.strip() for line in statement.splitlines() if line.strip()]

        initial_line = ""
        goal_line = ""
        for line in lines:
            lower = line.lower()
            if lower.startswith("as initial conditions i have that"):
                initial_line = line
            elif lower.startswith("my goal is to have that"):
                goal_line = line

        if not initial_line or not goal_line:
            return None

        initial_text = re.sub(
            r"^as initial conditions i have that,?\s*",
            "",
            initial_line,
            flags=re.IGNORECASE,
        )
        goal_text = re.sub(
            r"^my goal is to have that,?\s*",
            "",
            goal_line,
            flags=re.IGNORECASE,
        )

        facts_initial = self._split_facts(initial_text)
        facts_goal = self._split_facts(goal_text)

        objects = set(re.findall(r"object\s+([a-z])", statement.lower()))
        state = {
            "harmony": False,
            "planet": set(),
            "province": set(),
            "pain": set(),
            "craves": set(),
        }
        goals = {"craves": set()}

        for fact in facts_initial:
            m_crave = re.fullmatch(r"object\s+([a-z])\s+craves\s+object\s+([a-z])", fact)
            if m_crave:
                x, y = m_crave.group(1), m_crave.group(2)
                state["craves"].add((x, y))
                objects.update([x, y])
                continue

            m_planet = re.fullmatch(r"planet\s+object\s+([a-z])", fact)
            if m_planet:
                x = m_planet.group(1)
                state["planet"].add(x)
                objects.add(x)
                continue

            m_province = re.fullmatch(r"province\s+object\s+([a-z])", fact)
            if m_province:
                x = m_province.group(1)
                state["province"].add(x)
                objects.add(x)
                continue

            if fact == "harmony":
                state["harmony"] = True

        for fact in facts_goal:
            m_goal = re.fullmatch(r"object\s+([a-z])\s+craves\s+object\s+([a-z])", fact)
            if m_goal:
                x, y = m_goal.group(1), m_goal.group(2)
                goals["craves"].add((x, y))
                objects.update([x, y])

        if not objects:
            return None

        return state, goals, sorted(objects)

    def _state_key(self, state):
        return (
            state["harmony"],
            tuple(sorted(state["planet"])),
            tuple(sorted(state["province"])),
            tuple(sorted(state["pain"])),
            tuple(sorted(state["craves"])),
        )

    def _goal_satisfied(self, state, goals) -> bool:
        return goals["craves"].issubset(state["craves"])

    def _copy_state(self, state):
        return {
            "harmony": state["harmony"],
            "planet": set(state["planet"]),
            "province": set(state["province"]),
            "pain": set(state["pain"]),
            "craves": set(state["craves"]),
        }

    def _successors(self, state, objects):
        ordered_actions = ["attack", "succumb", "overcome", "feast"]
        result = []

        for action in ordered_actions:
            if action == "attack":
                for x in objects:
                    if state["harmony"] and x in state["planet"] and x in state["province"]:
                        nxt = self._copy_state(state)
                        nxt["pain"].add(x)
                        nxt["province"].discard(x)
                        nxt["planet"].discard(x)
                        nxt["harmony"] = False
                        result.append((f"(attack {x})", nxt))

            elif action == "succumb":
                for x in objects:
                    if x in state["pain"]:
                        nxt = self._copy_state(state)
                        nxt["province"].add(x)
                        nxt["planet"].add(x)
                        nxt["harmony"] = True
                        nxt["pain"].discard(x)
                        result.append((f"(succumb {x})", nxt))

            elif action == "overcome":
                for x in objects:
                    for y in objects:
                        if x == y:
                            continue
                        if y in state["province"] and x in state["pain"]:
                            nxt = self._copy_state(state)
                            nxt["harmony"] = True
                            nxt["province"].add(x)
                            nxt["craves"].add((x, y))
                            nxt["province"].discard(y)
                            nxt["pain"].discard(x)
                            result.append((f"(overcome {x} {y})", nxt))

            else:  # feast
                for x in objects:
                    for y in objects:
                        if x == y:
                            continue
                        if state["harmony"] and x in state["province"] and (x, y) in state["craves"]:
                            nxt = self._copy_state(state)
                            nxt["pain"].add(x)
                            nxt["province"].add(y)
                            nxt["craves"].discard((x, y))
                            nxt["province"].discard(x)
                            nxt["harmony"] = False
                            result.append((f"(feast {x} {y})", nxt))

        return result

    def _plan_with_bfs(self, scenario_context: str, max_depth: int = 14) -> list[str]:
        parsed = self._parse_problem(scenario_context)
        if parsed is None:
            return []

        initial_state, goals, objects = parsed
        if self._goal_satisfied(initial_state, goals):
            return []

        queue = deque([(initial_state, [])])
        visited = {self._state_key(initial_state): 0}

        while queue:
            state, plan = queue.popleft()
            if len(plan) >= max_depth:
                continue

            for action_text, nxt in self._successors(state, objects):
                new_plan = plan + [action_text]
                key = self._state_key(nxt)
                old_depth = visited.get(key)
                if old_depth is not None and old_depth <= len(new_plan):
                    continue
                visited[key] = len(new_plan)

                if self._goal_satisfied(nxt, goals):
                    return new_plan

                queue.append((nxt, new_plan))

        return []

    def _extract_actions(self, text: str) -> list[str]:
        candidates = []
        for line in text.splitlines():
            normalized = line.strip().lower().replace("-", " ")
            for pattern, formatter in self._patterns:
                match = pattern.search(normalized)
                if match:
                    candidates.append(formatter(match))
                    break

        if candidates:
            return self._trim_repeated_plan(candidates)

        spans = []
        lowered = text.lower().replace("-", " ")
        for pattern, formatter in self._patterns:
            for match in pattern.finditer(lowered):
                spans.append((match.start(), formatter(match)))
        spans.sort(key=lambda item: item[0])

        return self._trim_repeated_plan([action for _, action in spans])

    def _trim_repeated_plan(self, actions: list[str]) -> list[str]:
        n = len(actions)
        if n >= 2 and n % 2 == 0 and actions[: n // 2] == actions[n // 2 :]:
            return actions[: n // 2]
        return actions

    def solve(self, scenario_context: str, llm_engine_func) -> list:
        """
        Recibe el texto del escenario y la funcion del motor LLM.
        Retorna una lista de acciones en formato canonico:
        (attack x), (succumb x), (feast x y), (overcome x y)
        """
        plan = self._plan_with_bfs(scenario_context)
        if plan:
            return plan

        prompt_final = (
            f"{scenario_context}\n\n"
            "Generate only the missing plan for the final [STATEMENT].\n"
            "Output rules:\n"
            "1) One action per line.\n"
            "2) Use ONLY: (attack x), (succumb x), (feast x y), (overcome x y).\n"
            "3) Use lowercase object symbols.\n"
            "4) Do not add explanations, numbering, or extra text."
        )

        raw = llm_engine_func(
            prompt=prompt_final,
            system=self.system_prompt,
            max_new_tokens=220,
        )
        return self._extract_actions(raw)

# Dev Test

In [28]:
import json

ARCHIVO_DESARROLLO = "Examples.json"

def dev_test(n_casos=10):
    print(f"Cargando dataset de desarrollo: {ARCHIVO_DESARROLLO}")
    with open(ARCHIVO_DESARROLLO, 'r') as f:
        casos = json.load(f)

    agente = AssemblyAgent()
    puntaje_total = 0.0
    casos_evaluados = min(n_casos, len(casos)) # Limite para pruebas rapidas

    print("-" * 50)
    for i in range(casos_evaluados):
        caso = casos[i]
        print(f"Evaluando Tarea ID: {caso['assembly_task_id']} (Longitud optima: {caso['complexity_level']})")

        plan_generado = agente.solve(caso['scenario_context'], qwen)
        plan_optimo = caso['target_action_sequence']

        # Calculo de metrica
        score = calcular_score_plan(plan_generado, plan_optimo)
        puntaje_total += score

        print(f"Plan Generado: {plan_generado}")
        print(f"Score obtenido: {score} / 10.0\n")

    promedio = puntaje_total / casos_evaluados
    print("-" * 50)
    print(f"Puntaje Promedio en Desarrollo: {round(promedio, 2)} / 10.0")

dev_test(10)

Cargando dataset de desarrollo: Examples.json
--------------------------------------------------
Evaluando Tarea ID: task_6a4ed4586d (Longitud optima: 4)
Plan Generado: ['(attack a)', '(overcome a c)', '(attack b)', '(overcome b a)']
Score obtenido: 10.0 / 10.0

Evaluando Tarea ID: task_b9a95877e2 (Longitud optima: 4)
Plan Generado: ['(feast a b)', '(succumb a)', '(feast b c)', '(overcome b a)']
Score obtenido: 10.0 / 10.0

Evaluando Tarea ID: task_0d236ad4c6 (Longitud optima: 6)
Plan Generado: ['(feast a c)', '(succumb a)', '(feast c b)', '(overcome c a)', '(attack b)', '(overcome b c)']
Score obtenido: 10.0 / 10.0

Evaluando Tarea ID: task_372c431054 (Longitud optima: 6)
Plan Generado: ['(feast a c)', '(succumb a)', '(attack b)', '(overcome b c)', '(attack a)', '(overcome a b)']
Score obtenido: 10.0 / 10.0

Evaluando Tarea ID: task_79c0ae73f7 (Longitud optima: 2)
Plan Generado: ['(attack a)', '(overcome a c)']
Score obtenido: 10.0 / 10.0

Evaluando Tarea ID: task_5cc798432f (Longitud

# Submit

In [29]:
import json

ARCHIVO_EVALUACION = "Task.json"
ARCHIVO_SALIDA = "submission.json"

def submit():
    print(f"Iniciando ejecucion sobre: {ARCHIVO_EVALUACION}")
    with open(ARCHIVO_EVALUACION, 'r') as f:
        casos = json.load(f)

    agente = AssemblyAgent()
    resultados_entrega = []

    for i, caso in enumerate(casos):
        task_id = caso['assembly_task_id']
        print(f"Procesando caso {i+1}/{len(casos)} (ID: {task_id})...")

        try:
            plan_generado = agente.solve(caso['scenario_context'], qwen)

            resultados_entrega.append({
                "assembly_task_id": task_id,
                "target_action_sequence": plan_generado
            })

        except Exception as e:
            print(f"ERROR critico en el caso {task_id}: {e}")
            print("Corrige tu codigo. https://www.youtube.com/watch?v=Y-U1calv6X8")
            return

    # Guardar
    with open(ARCHIVO_SALIDA, 'w') as f:
        json.dump(resultados_entrega, f, indent=4)

    print("-" * 50)
    print(f"Exito. Archivo '{ARCHIVO_SALIDA}' generado correctamente.")

submit()

Iniciando ejecucion sobre: Task.json
Procesando caso 1/50 (ID: task_f6c3f52f55)...
Procesando caso 2/50 (ID: task_07a18910c7)...
Procesando caso 3/50 (ID: task_cbe2649f6b)...
Procesando caso 4/50 (ID: task_4f181b1e7e)...
Procesando caso 5/50 (ID: task_9f39e7f413)...
Procesando caso 6/50 (ID: task_a51e02706c)...
Procesando caso 7/50 (ID: task_8a763f838b)...
Procesando caso 8/50 (ID: task_4d92dfa4d1)...
Procesando caso 9/50 (ID: task_19c005f4fa)...
Procesando caso 10/50 (ID: task_98c536f455)...
Procesando caso 11/50 (ID: task_b516b29d5b)...
Procesando caso 12/50 (ID: task_2833f3d973)...
Procesando caso 13/50 (ID: task_a2e1da433c)...
Procesando caso 14/50 (ID: task_b05ad833d2)...
Procesando caso 15/50 (ID: task_a90c8a54bb)...
Procesando caso 16/50 (ID: task_dae8652991)...
Procesando caso 17/50 (ID: task_07201ae0aa)...
Procesando caso 18/50 (ID: task_37dd853373)...
Procesando caso 19/50 (ID: task_784946134b)...
Procesando caso 20/50 (ID: task_32d7e53ce2)...
Procesando caso 21/50 (ID: task_