# LLM_ENGINE

In [1]:
!pip -q install --upgrade transformers accelerate bitsandbytes einops

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.4/10.4 MB[0m [31m51.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.7/60.7 MB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, TextStreamer
import torch

MODEL_ID = "Qwen/Qwen3-8B"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4",
)

tokenizer = AutoTokenizer.from_pretrained(
    MODEL_ID,
    use_fast=True,
    trust_remote_code=True
)

model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    quantization_config=bnb_config,
    device_map="auto",
    torch_dtype=torch.float16,
    trust_remote_code=True
)
model.eval()

def qwen( prompt:str,
          system:str|None=None,
          max_new_tokens:int=512,
          temperature:float=0.8,
          top_p:float=0.9,
          enable_thinking:bool=False,
          do_sample:bool=False,
          stream:bool=False) -> str:

    messages = []
    if system:
        messages.append({"role": "system", "content": system})
    messages.append({"role": "user", "content": prompt})

    # ID de tokens (respuesta)
    text = tokenizer.apply_chat_template(
        messages,
        add_generation_prompt=True,
        tokenize=False,
        enable_thinking=enable_thinking,
    )
    # Traducimos los Ids como texto
    inputs = tokenizer([text], return_tensors="pt").to(model.device)

    gen_kwargs = dict(
        max_new_tokens=max_new_tokens,
        do_sample=do_sample,
        temperature=temperature,
        top_p=top_p,
        eos_token_id=tokenizer.eos_token_id,
        pad_token_id=tokenizer.eos_token_id,
    )

    if stream:
        streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
        _ = model.generate(**inputs, streamer=streamer, **gen_kwargs)
        return ""

    with torch.no_grad():
        # resp = [input, output]
        out = model.generate(**inputs, **gen_kwargs)
    # Sólo la parte nueva:
    gen_ids = out[0, inputs.input_ids.shape[1]:]
    return tokenizer.decode(gen_ids, skip_special_tokens=True)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/728 [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]



merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json:   0%|          | 0.00/11.4M [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors.index.json: 0.00B [00:00, ?B/s]

Downloading (incomplete total...): 0.00B [00:00, ?B/s]

Fetching 5 files:   0%|          | 0/5 [00:00<?, ?it/s]

Loading weights:   0%|          | 0/399 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

# Evaluator

In [3]:
def limpiar_accion(accion_texto):
    texto = accion_texto.replace('(', '').replace(')', '')
    return texto.strip().lower()

def calcular_score_plan(plan_generado, plan_optimo):
    P = [limpiar_accion(p) for p in plan_generado if p.strip()]
    G = [limpiar_accion(p) for p in plan_optimo if p.strip()]

    L_P = len(P)
    L_G = len(G)

    if L_P == 0:
        return 0.0

    score_horizonte = 2.0 if L_P == L_G else 0.0

    l_match = 0
    for p_accion, g_accion in zip(P, G):
        if p_accion == g_accion:
            l_match += 1
        else:
            break

    score_progreso = 3.0 * (l_match / L_G)
    score_exacto = 5.0 if (l_match == L_G and L_P == L_G) else 0.0

    return round(score_horizonte + score_progreso + score_exacto, 2)

# Student_agent

In [77]:
# student_agent.py
import json
import re
from pathlib import Path
from typing import List, Dict, Tuple, Optional

EXAMPLES_PATH_DEFAULT = "Examples.json"

# ----------------------------
# Utilities
# ----------------------------

COLORS = {"red","blue","orange","yellow","green","purple","pink","black","white","gray","grey","brown"}

def _domain_of(s: str) -> str:
    s = s.lower()
    if "set of blocks" in s:
        return "blocks"
    if "set of objects" in s:
        return "objects"
    return "blocks" if "block" in s else "objects"

def _extract_last_unsolved_statement(scenario_context: str) -> str:
    sc = scenario_context
    last_stmt = sc.rfind("[STATEMENT]")
    if last_stmt == -1:
        return sc
    tail = sc[last_stmt:]
    last_plan = tail.rfind("[PLAN]")
    if last_plan == -1:
        return tail
    return tail[: last_plan + len("[PLAN]")]

_token_re = re.compile(r"[a-zA-Z_]+|\d+")

def _tokenize(text: str) -> List[str]:
    toks = _token_re.findall(text.lower())
    out: List[str] = []
    for t in toks:
        out.append(t)
        # boost colors / single-letter objects
        if t in COLORS:
            out.extend([t, t])
        if len(t) == 1 and t.isalpha():  # object letters a,b,c...
            out.extend([t, t])
    return out

def _bag_overlap_score(q: List[str], d: List[str]) -> float:
    # weighted overlap: multiset-ish via counts
    # fast enough for few thousand examples
    from collections import Counter
    cq, cd = Counter(q), Counter(d)
    inter = 0
    union = 0
    for k, v in cq.items():
        inter += min(v, cd.get(k, 0))
        union += max(v, cd.get(k, 0))
    # add tokens only in doc
    for k, v in cd.items():
        if k not in cq:
            union += v
    return (inter / union) if union else 0.0

# ----------------------------
# Goal extractors (lightweight, for retrieval + prompt hints)
# ----------------------------

_goal_blocks_re = re.compile(r"my goal is to have that (.+?)\.", re.IGNORECASE | re.DOTALL)
_on_top_re = re.compile(r"the ([a-z]+) block is on top of the ([a-z]+) block", re.IGNORECASE)

def _extract_block_goals(problem: str) -> List[Tuple[str,str]]:
    # returns list of (x,y) meaning x on y
    m = _goal_blocks_re.search(problem)
    if not m:
        return []
    goal_txt = m.group(1)
    pairs = _on_top_re.findall(goal_txt)
    return [(a.lower(), b.lower()) for a,b in pairs]

_goal_obj_re = re.compile(r"my goal is to have that (.+?)\.", re.IGNORECASE | re.DOTALL)
_craves_re = re.compile(r"object ([a-z]) craves object ([a-z])", re.IGNORECASE)

def _extract_object_goals(problem: str) -> List[Tuple[str,str]]:
    m = _goal_obj_re.search(problem)
    if not m:
        return []
    goal_txt = m.group(1)
    pairs = _craves_re.findall(goal_txt)
    return [(a.lower(), b.lower()) for a,b in pairs]

# ----------------------------
# Optional read-only validator (debug only)
# ----------------------------

_ACTION_RE = re.compile(r"^\([a-zA-Z_]+\s+[a-zA-Z0-9_]+\s*(?:[a-zA-Z0-9_]+)?\)$")

def validate_plan(domain: str, plan: List[str]) -> List[str]:
    issues: List[str] = []
    if not plan:
        return ["EMPTY_PLAN"]

    allowed = {"unmount_node","mount_node","engage_payload","release_payload"} if domain=="blocks" else {"attack","succumb","feast","overcome"}

    for i, line in enumerate(plan):
        if "from" in line.lower():
            issues.append(f"LINE_{i}_CONTAINS_FROM")
        if not (line.startswith("(") and line.endswith(")")):
            issues.append(f"LINE_{i}_NOT_PARENTHESIZED")
            continue
        if not _ACTION_RE.match(line):
            issues.append(f"LINE_{i}_BAD_FORMAT")
        inner = line[1:-1].strip().split()
        if not inner:
            issues.append(f"LINE_{i}_EMPTY")
            continue
        act = inner[0]
        if act not in allowed:
            issues.append(f"LINE_{i}_UNKNOWN_ACTION:{act}")
        if domain=="blocks":
            if act in {"unmount_node","mount_node"} and len(inner)!=3:
                issues.append(f"LINE_{i}_BAD_ARITY:{act}")
            if act in {"engage_payload","release_payload"} and len(inner)!=2:
                issues.append(f"LINE_{i}_BAD_ARITY:{act}")
        else:
            if act in {"attack","succumb"} and len(inner)!=2:
                issues.append(f"LINE_{i}_BAD_ARITY:{act}")
            if act in {"feast","overcome"} and len(inner)!=3:
                issues.append(f"LINE_{i}_BAD_ARITY:{act}")
    return issues

# ----------------------------
# Agent
# ----------------------------

class AssemblyAgent:
    def __init__(self, examples_path: str = EXAMPLES_PATH_DEFAULT, shots_k: int = 6):
        self.examples_path = examples_path
        self.shots_k = shots_k

        self.system_objects = (
            "You are an expert planner in the OBJECTS domain. "
            "Output ONLY the final plan as parenthesized action lines. "
            "No explanations, no markdown, no extra text."
        )
        self.system_blocks = (
            "You are an expert planner in the BLOCKS domain. "
            "Output ONLY the final plan as parenthesized action lines. "
            "No explanations, no markdown, no extra text."
        )

        self.blocks: List[Dict] = []
        self.objects: List[Dict] = []

        p = Path(self.examples_path)
        if p.exists():
            data = json.loads(p.read_text(encoding="utf-8"))
            for ex in data:
                dom = _domain_of(ex["scenario_context"])
                prob = _extract_last_unsolved_statement(ex["scenario_context"])
                item = {
                    "problem": prob,
                    "target_action_sequence": ex["target_action_sequence"],
                    "tok": _tokenize(prob),
                    "L": len(ex["target_action_sequence"]),
                    "goals_blocks": _extract_block_goals(prob),
                    "goals_objects": _extract_object_goals(prob),
                }
                (self.blocks if dom=="blocks" else self.objects).append(item)

    def _select_shots(self, problem: str, domain: str) -> List[Dict]:
        pool = self.blocks if domain=="blocks" else self.objects
        if not pool:
            return []

        qtok = _tokenize(problem)
        if domain=="blocks":
            qgoals = _extract_block_goals(problem)
        else:
            qgoals = _extract_object_goals(problem)

        scored = []
        for ex in pool:
            base = _bag_overlap_score(qtok, ex["tok"])
            # goal-aware boost: prefer examples with same number of goal pairs and shared moved symbols
            boost = 0.0
            if domain=="blocks":
                eg = ex["goals_blocks"]
                if qgoals and eg and len(qgoals)==len(eg):
                    # share at least one moved block token
                    qx = {a for a,_ in qgoals}
                    exx = {a for a,_ in eg}
                    if qx & exx:
                        boost += 0.08
            else:
                eg = ex["goals_objects"]
                if qgoals and eg and len(qgoals)==len(eg):
                    qx = {a for a,_ in qgoals}
                    exx = {a for a,_ in eg}
                    if qx & exx:
                        boost += 0.08
            scored.append((base + boost, ex))

        scored.sort(key=lambda x: x[0], reverse=True)
        return [ex for _, ex in scored[: self.shots_k]]

    # --------------------------
    # Prompts
    # --------------------------

    def _build_prompt_objects(self, problem: str, shots: List[Dict]) -> str:
        goals = _extract_object_goals(problem)
        goal_hint = ""
        if goals:
            goal_hint = "GOAL CRAVINGS (must satisfy exactly): " + ", ".join([f"{a}->{b}" for a,b in goals]) + "\n"

        parts = []
        parts.append(
            "OUTPUT FORMAT (STRICT):\n"
            "- Output ONLY action lines.\n"
            "- One action per line.\n"
            "- Each line MUST start with '(' and end with ')'.\n"
            "- NEVER write the word 'from' inside parentheses.\n\n"
            "VALID ACTIONS:\n"
            "- (attack X)\n"
            "- (succumb X)\n"
            "- (feast X Y)\n"
            "- (overcome X Y)\n\n"
            "STRICT PLANNING RULES:\n"
            "1) Use ONLY the LAST [STATEMENT] initial conditions + goal.\n"
            "2) Only act on object letters that appear in the GOAL cravings.\n"
            "3) If GOAL is a single craving X craves Y, prefer EXACTLY 2 steps:\n"
            "   (attack X)\n"
            "   (overcome X Y)\n"
            "   unless Attack is impossible due to missing preconditions.\n"
            "4) Do NOT use feast/succumb if Attack is already possible.\n"
            "5) Never add extra actions after goal is satisfied.\n"
            "6) Overcome order: if you output (overcome X Y), you must have (attack X) earlier\n"
            "   unless Pain X is explicitly already true.\n\n"
            + goal_hint +
            "Now solve. Output plan lines only.\n"
        )

        if shots:
            parts.append("\nEXAMPLES (copy the style and minimality):")
            for ex in shots[:3]:
                parts.append("\nProblem:\n" + ex["problem"])
                parts.append("Plan:")
                parts.extend(ex["target_action_sequence"])
            parts.append("\nEND EXAMPLES.\n")

        parts.append("PROBLEM:\n" + problem)
        parts.append("\nPLAN:")
        return "\n".join(parts)

    def _build_prompt_blocks(self, problem: str, shots: List[Dict]) -> str:
        goals = _extract_block_goals(problem)
        goal_hint = ""
        if goals:
            goal_hint = "GOAL STACKS (must satisfy): " + ", ".join([f"{a} on {b}" for a,b in goals]) + "\n"

        parts = []
        parts.append(
            "OUTPUT FORMAT (STRICT):\n"
            "- Output ONLY action lines.\n"
            "- One action per line.\n"
            "- Each line MUST start with '(' and end with ')'.\n\n"
            "VALID ACTIONS:\n"
            "- (unmount_node X Y)\n"
            "- (mount_node X Y)\n"
            "- (engage_payload X)\n"
            "- (release_payload X)\n\n"
            "MECHANICS (STRICT):\n"
            "A) unmount_node X Y means X was on Y; after this you are HOLDING X.\n"
            "B) mount_node X Y requires you are holding X; after this hand becomes EMPTY.\n"
            "C) engage_payload X only for blocks ON THE TABLE; after this you are HOLDING X.\n"
            "D) release_payload X puts down held X; after this hand becomes EMPTY.\n\n"
            "STRICT OPTIMALITY RULES:\n"
            "1) If goal includes 'X on Y', the final stacking for that relation MUST be (mount_node X Y).\n"
            "2) If X is currently on Z and you need X on Y, the core move is:\n"
            "   (unmount_node X Z)\n"
            "   (mount_node X Y)\n"
            "   Do NOT insert release/engage between them.\n"
            "3) Use release/engage ONLY when you must switch which block you are holding.\n"
            "4) Never use engage_payload before an unmount_node.\n"
            "5) Build towers bottom-up.\n"
            "6) Use the SHORTEST valid plan.\n\n"
            + goal_hint +
            "Now solve. Output plan lines only.\n"
        )

        if shots:
            parts.append("\nEXAMPLES (copy the minimal patterns):")
            for ex in shots[:3]:
                parts.append("\nProblem:\n" + ex["problem"])
                parts.append("Plan:")
                parts.extend(ex["target_action_sequence"])
            parts.append("\nEND EXAMPLES.\n")

        parts.append("PROBLEM:\n" + problem)
        parts.append("\nPLAN:")
        return "\n".join(parts)

    # --------------------------
    # Solve
    # --------------------------

    def solve(self, scenario_context: str, llm_engine_func) -> list:
        domain = _domain_of(scenario_context)
        problem = _extract_last_unsolved_statement(scenario_context)

        shots = self._select_shots(problem, domain)

        if domain == "blocks":
            system = self.system_blocks
            prompt = self._build_prompt_blocks(problem, shots)
            max_tokens = 192
        else:
            system = self.system_objects
            prompt = self._build_prompt_objects(problem, shots)
            max_tokens = 192

        resp = llm_engine_func(
            prompt=prompt,
            system=system,
            temperature=0.0,
            do_sample=False,
            top_p=1.0,
            max_new_tokens=max_tokens,
            enable_thinking=False,
            stream=False
        )

        plan = [l.strip() for l in resp.split("\n") if l.strip()]

        # Debug-only (does not modify output)
        debug_validate = False
        if debug_validate:
            issues = validate_plan(domain, plan)
            if issues:
                print("[VALIDATION WARNINGS]", issues[:12])

        return plan

# Dev_test

In [84]:
import json

ARCHIVO_DESARROLLO = "Examples.json"

def main(n_casos=10):
    print(f"Cargando dataset de desarrollo: {ARCHIVO_DESARROLLO}")
    with open(ARCHIVO_DESARROLLO, 'r') as f:
        casos = json.load(f)

    agente = AssemblyAgent()
    puntaje_total = 0.0
    casos_evaluados = min(n_casos, len(casos)) # Limite para pruebas rapidas

    print("-" * 50)
    for i in range(casos_evaluados):
        caso = casos[i]
        print(f"Evaluando Tarea ID: {caso['assembly_task_id']} (Longitud optima: {caso['complexity_level']})")

        plan_generado = agente.solve(caso['scenario_context'], qwen)
        plan_optimo = caso['target_action_sequence']

        # Calculo de metrica
        score = calcular_score_plan(plan_generado, plan_optimo)
        puntaje_total += score

        print(f"Plan Generado: {plan_generado}")
        print(f"Score obtenido: {score} / 10.0\n")

    promedio = puntaje_total / casos_evaluados
    print("-" * 50)
    print(f"Puntaje Promedio en Desarrollo: {round(promedio, 2)} / 10.0")

if __name__ == "__main__":
    main(10)

Cargando dataset de desarrollo: Examples.json
--------------------------------------------------
Evaluando Tarea ID: task_6a4ed4586d (Longitud optima: 4)
Plan Generado: ['(attack a)', '(overcome a c)', '(attack b)', '(overcome b a)']
Score obtenido: 10.0 / 10.0

Evaluando Tarea ID: task_b9a95877e2 (Longitud optima: 4)
Plan Generado: ['(feast a b)', '(succumb a)', '(feast b c)', '(overcome b a)']
Score obtenido: 10.0 / 10.0

Evaluando Tarea ID: task_0d236ad4c6 (Longitud optima: 6)
Plan Generado: ['(attack a)', '(overcome a c)', '(attack b)', '(overcome b c)']
Score obtenido: 0.0 / 10.0

Evaluando Tarea ID: task_372c431054 (Longitud optima: 6)
Plan Generado: ['(attack a)', '(overcome a b)', '(attack b)', '(overcome b c)']
Score obtenido: 0.0 / 10.0

Evaluando Tarea ID: task_79c0ae73f7 (Longitud optima: 2)
Plan Generado: ['(attack a)', '(overcome a c)']
Score obtenido: 10.0 / 10.0

Evaluando Tarea ID: task_5cc798432f (Longitud optima: 4)
Plan Generado: ['(attack c)', '(overcome c a)']
Sco

In [None]:
import json

ARCHIVO_EVALUACION = "Task.json"
ARCHIVO_SALIDA = "submission.json"

def main():
    print(f"Iniciando ejecucion sobre: {ARCHIVO_EVALUACION}")
    with open(ARCHIVO_EVALUACION, 'r') as f:
        casos = json.load(f)

    agente = AssemblyAgent()
    resultados_entrega = []

    for i, caso in enumerate(casos):
        task_id = caso['assembly_task_id']
        print(f"Procesando caso {i+1}/{len(casos)} (ID: {task_id})...")

        try:
            plan_generado = agente.solve(caso['scenario_context'], qwen)

            resultados_entrega.append({
                "assembly_task_id": task_id,
                "target_action_sequence": plan_generado
            })

        except Exception as e:
            print(f"ERROR critico en el caso {task_id}: {e}")
            print("Corrige tu codigo. https://www.youtube.com/watch?v=Y-U1calv6X8")
            return

    # Guardar
    with open(ARCHIVO_SALIDA, 'w') as f:
        json.dump(resultados_entrega, f, indent=4)

    print("-" * 50)
    print(f"Exito. Archivo '{ARCHIVO_SALIDA}' generado correctamente.")

if __name__ == "__main__":
    main()

Iniciando ejecucion sobre: Task.json
Procesando caso 1/50 (ID: task_f6c3f52f55)...
Procesando caso 2/50 (ID: task_07a18910c7)...
Procesando caso 3/50 (ID: task_cbe2649f6b)...
Procesando caso 4/50 (ID: task_4f181b1e7e)...
Procesando caso 5/50 (ID: task_9f39e7f413)...
Procesando caso 6/50 (ID: task_a51e02706c)...
Procesando caso 7/50 (ID: task_8a763f838b)...
Procesando caso 8/50 (ID: task_4d92dfa4d1)...
Procesando caso 9/50 (ID: task_19c005f4fa)...
Procesando caso 10/50 (ID: task_98c536f455)...
Procesando caso 11/50 (ID: task_b516b29d5b)...
