In [2]:
# main.py
from tree_of_thoughts import TreeOfThoughts
from inference import run_inference_pipeline
from CAMRE_EDU import combined_reasoning_score
import json
import os

# --------------------
# Helper conversion utilities (no try/except per user request)
# --------------------
def safe_get_text(item):
    """
    Extract text from common shapes:
    - if str -> return it
    - if dict -> try common keys
    - if list/tuple -> join subitems recursively
    """
    if item is None:
        return ""
    if isinstance(item, str):
        return item
    if isinstance(item, (list, tuple)):
        parts = [safe_get_text(x) for x in item]
        return " ".join([p for p in parts if p])
    if isinstance(item, dict):
        for k in ("text", "input", "output", "content", "message", "prompt"):
            v = item.get(k)
            if v:
                return safe_get_text(v)
        parts = []
        for v in item.values():
            if isinstance(v, (str, list, tuple, dict)):
                parts.append(safe_get_text(v))
        return " ".join([p for p in parts if p]) or ""
    return str(item)


def ensure_scores(d):
    """Return a dict of numeric scores. If d is missing, return {}."""
    if not isinstance(d, dict):
        return {}
    return {k: v for k, v in d.items() if isinstance(v, (int, float))}


def pipeline_to_candidates(pipe_out):
    """
    Convert pipeline output (which may have strings or dicts) into a canonical list of candidate dicts:
      {"text","role","scores","metadata","embedding"}
    """
    candidates = []

    if not isinstance(pipe_out, dict):
        if isinstance(pipe_out, str):
            candidates.append({"text": pipe_out, "role": "raw", "scores": {}, "metadata": {"orig": pipe_out}})
        elif isinstance(pipe_out, (list, tuple)):
            for it in pipe_out:
                candidates.append({"text": safe_get_text(it), "role": "raw", "scores": {}, "metadata": {"orig": it}})
        return candidates

    def ingest_field(field_value, role_hint):
        if field_value is None:
            return
        if isinstance(field_value, (list, tuple)):
            for item in field_value:
                text = safe_get_text(item)
                meta = item if isinstance(item, dict) else {"orig": item}
                scores = ensure_scores(item.get("scores") if isinstance(item, dict) else {})
                candidates.append({"text": text, "role": role_hint, "scores": scores, "metadata": meta})
        else:
            text = safe_get_text(field_value)
            meta = field_value if isinstance(field_value, dict) else {"orig": field_value}
            scores = ensure_scores(field_value.get("scores") if isinstance(field_value, dict) else {})
            candidates.append({"text": text, "role": role_hint, "scores": scores, "metadata": meta})

    ingest_field(pipe_out.get("final_subtasks"), "final_subtask")
    ingest_field(pipe_out.get("suggestions"), "suggestion")
    ingest_field(pipe_out.get("outputs"), "output")
    ingest_field(pipe_out.get("thoughts"), "thought")
    ingest_field(pipe_out.get("intermediate_steps"), "thought")

    known_keys = {"final_subtasks", "suggestions", "outputs", "thoughts", "intermediate_steps"}
    for k, v in pipe_out.items():
        if k in known_keys:
            continue
        if isinstance(v, (str, list, tuple, dict)):
            ingest_field(v, role_hint=f"field:{k}")

    if not candidates:
        fallback_text = safe_get_text(pipe_out.get("text") or pipe_out.get("content") or pipe_out)
        if fallback_text:
            candidates.append({"text": fallback_text, "role": "fallback", "scores": {}, "metadata": {"orig": pipe_out}})

    return candidates


def expand_fn_using_pipeline(prompt_text, metadata=None):
    """
    Expansion function: call run_inference_pipeline(prompt_text, auto_extend=False)
    Return candidate list with attached combined_score where possible.
    """
    out = run_inference_pipeline(prompt_text, auto_extend=False)
    cands = pipeline_to_candidates(out)

    reasoning = combined_reasoning_score(out, prompt_text)

    for c in cands:
        c.setdefault("scores", {})
        # Prefer explicit combined_score if present
        if "combined_score" in c["scores"] and isinstance(c["scores"]["combined_score"], (int, float)):
            pass
        else:
            # derive from reasoning if a numeric field exists
            if isinstance(reasoning, dict):
                for key in ("combined", "confidence", "score", "overall"):
                    val = reasoning.get(key)
                    if isinstance(val, (int, float)):
                        c["scores"]["combined_score"] = float(val)
                        break
        # fallback to metadata scores average
        meta_scores = c.get("metadata", {})
        if "combined_score" not in c["scores"]:
            if isinstance(meta_scores, dict) and isinstance(meta_scores.get("scores"), dict):
                numeric_vals = [v for v in meta_scores["scores"].values() if isinstance(v, (int, float))]
                if numeric_vals:
                    c["scores"]["combined_score"] = float(sum(numeric_vals) / len(numeric_vals))
        c["scores"].setdefault("combined_score", 0.0)
        # attach reasoning snapshot to metadata
        c.setdefault("metadata", {})
        c["metadata"]["reasoning_snapshot"] = reasoning

    return cands


# --------------------
# Main integration that builds the TreeOfThoughts
# --------------------
def main():
    example_prompt = "how can i learn french?"

    # Create tree and root
    tree = TreeOfThoughts()
    root = tree.add_node(text=example_prompt, parent_id=None, role="root", metadata={"source": "user_prompt"})
    tree.set_root(root.id)

    # Run pipeline once with auto_extend True to get full initial output
    out = run_inference_pipeline(example_prompt, auto_extend=True)

    # Convert pipeline output into initial candidate nodes and attach as children
    initial_candidates = pipeline_to_candidates(out)
    for cand in initial_candidates:
        tree.add_node(
            text=cand["text"],
            parent_id=root.id,
            role=cand.get("role"),
            scores=cand.get("scores", {}),
            metadata=cand.get("metadata", {})
        )

    # Attach combined reasoning diagnostic to root metadata
    reasoning_result = combined_reasoning_score(out, example_prompt)
    tree.nodes[root.id].metadata["reasoning_diagnostic"] = reasoning_result

    # Also attach same diagnostic snapshot to top-level children metadata
    for child_id in tree.nodes[root.id].children:
        tree.nodes[child_id].metadata.setdefault("root_reasoning", reasoning_result)

    # Expand each top-level child once using the pipeline expansion function (beam size configurable)
    beam_size = 2
    for child_id in list(tree.nodes[root.id].children):
        tree.expand_with(child_id, expand_fn=expand_fn_using_pipeline, beam=beam_size)

    # Example: print top 5 nodes by combined_score
    top_nodes = tree.top_k_by_score("combined_score", k=5)
    print("\nTop nodes by combined_score:")
    for n in top_nodes:
        print(f"- id={n.id[:8]} score={n.scores.get('combined_score')} role={n.role} text={n.text[:200]}")

    # Example: compute best path by summed combined_score
    path_nodes, path_score = tree.best_path("combined_score", max_depth=6)
    print("\nBest path (by summed combined_score):", path_score)
    for n in path_nodes:
        print(f" * [{n.role}] {n.text[:200]} (score={n.scores.get('combined_score')})")

    # Save tree to disk
    outpath = "tree_of_thoughts_example.json"
    tree.save_json(outpath)
    print(f"\nSaved tree to {outpath}")

    # Print formatted final subtasks (as before)
    print("\n=== FORMATTED FINAL SUBTASKS ===")
    for i, s in enumerate(out.get("final_subtasks", []), start=1):
        text = safe_get_text(s.get("input") if isinstance(s, dict) else s) if isinstance(s, (dict, str)) else safe_get_text(s)
        print(f"{i}. {text}")

    print("\n=== REASONING DIAGNOSTIC ===")
    print(json.dumps(reasoning_result, indent=2))


if __name__ == "__main__":
    main()


Subtasks:
 1. Find online resources for learning French.
 2. Identify language learning apps suitable for beginners.
 3. Enroll in a French language course or find a tutor.
 4. Watch French TV shows and movies with English subtitles.
 5. Practice speaking with a native speaker or language exchange partner.
 6. Learn basic French phrases and vocabulary using flashcards or a dictionary.
 7. Immerse yourself in the French language by listening to French music and podcasts.
[DEBUG] run_decomposer returned type=<class 'list'>, preview=['Find online resources for learning French.', 'Identify language learning apps suitable for beginners.', 'Enroll in a French language course or find a tutor.', 'Watch French TV shows and movies with English subtitles.', 'Practice speaking with a native speaker or language exchange partner.', 'Learn
Subtasks:
 1. Open a search engine website (e.g., Google, Bing).
 2. Enter keywords "learn French" or similar phrases into the search bar.
 3. Filter results by ca

In [5]:
# main.py
# Builds on your existing pipeline, ensures `final_subtasks` are produced by running the decomposer when needed,
# normalizes mixed types (str/dict/list), prints formatted subtasks, builds a TreeOfThoughts, attaches reasoning scores,
# expands children once, and saves the tree.
#
# Requirements:
# - tree_of_thoughts.py (exists in same folder)
# - inference.run_inference_pipeline(prompt, auto_extend=...) available
# - inference.run_decomposer(prompt) available (your debug logs show this function)
# - CAMRE_EDU.combined_reasoning_score available
# - No try/except per user request

from tree_of_thoughts import TreeOfThoughts
from inference import run_inference_pipeline, run_decomposer
from CAMRE_EDU import combined_reasoning_score
import json

# ---------- Helpers ----------
def safe_get_text(item):
    """Return readable text from str/dict/list/tuple or simple fallback str(item)."""
    if item is None:
        return ""
    if isinstance(item, str):
        return item.strip()
    if isinstance(item, (list, tuple)):
        parts = [safe_get_text(x) for x in item]
        return " ".join([p for p in parts if p])
    if isinstance(item, dict):
        # prefer common keys
        for k in ("text", "input", "output", "content", "message", "prompt", "title"):
            if k in item and item[k] is not None:
                return safe_get_text(item[k])
        parts = []
        for v in item.values():
            if isinstance(v, (str, list, tuple, dict)):
                parts.append(safe_get_text(v))
        joined = " ".join([p for p in parts if p])
        return joined
    return str(item)

def numeric_scores_from_reasoning(reasoning):
    """Derive a single numeric combined score from a reasoning dict / numeric value."""
    if reasoning is None:
        return None
    if isinstance(reasoning, (int, float)):
        return float(reasoning)
    if isinstance(reasoning, dict):
        for key in ("reasoning_score", "combined", "combined_score", "confidence", "score", "overall"):
            if key in reasoning and isinstance(reasoning[key], (int, float)):
                return float(reasoning[key])
    return None

def ensure_numeric_scores(d):
    """Return numeric-only scores from dict d, else {}."""
    if not isinstance(d, dict):
        return {}
    return {k: v for k, v in d.items() if isinstance(v, (int, float))}

def pipeline_to_candidates(pipe_out):
    """
    Normalize pipeline output into a list of canonical candidate dicts:
      {"text","role","scores","metadata"}
    Accepts mixed types (str/dict/list).
    """
    candidates = []
    if pipe_out is None:
        return candidates

    if not isinstance(pipe_out, dict):
        if isinstance(pipe_out, str):
            candidates.append({"text": pipe_out, "role": "raw", "scores": {}, "metadata": {"orig": pipe_out}})
        elif isinstance(pipe_out, (list, tuple)):
            for it in pipe_out:
                candidates.append({"text": safe_get_text(it), "role": "raw", "scores": {}, "metadata": {"orig": it}})
        return candidates

    def ingest(field_value, role_hint):
        if field_value is None:
            return
        if isinstance(field_value, (list, tuple)):
            for item in field_value:
                text = safe_get_text(item)
                meta = item if isinstance(item, dict) else {"orig": item}
                scores = ensure_numeric_scores(item.get("scores") if isinstance(item, dict) else {})
                candidates.append({"text": text, "role": role_hint, "scores": scores, "metadata": meta})
        else:
            text = safe_get_text(field_value)
            meta = field_value if isinstance(field_value, dict) else {"orig": field_value}
            scores = ensure_numeric_scores(field_value.get("scores") if isinstance(field_value, dict) else {})
            candidates.append({"text": text, "role": role_hint, "scores": scores, "metadata": meta})

    ingest(pipe_out.get("final_subtasks"), "final_subtask")
    ingest(pipe_out.get("suggestions"), "suggestion")
    ingest(pipe_out.get("outputs"), "output")
    ingest(pipe_out.get("thoughts"), "thought")
    ingest(pipe_out.get("intermediate_steps"), "thought")

    known = {"final_subtasks", "suggestions", "outputs", "thoughts", "intermediate_steps"}
    for k, v in pipe_out.items():
        if k in known:
            continue
        if isinstance(v, (str, list, tuple, dict)):
            ingest(v, role_hint=f"field:{k}")

    if not candidates:
        fallback = safe_get_text(pipe_out.get("text") or pipe_out.get("content") or pipe_out)
        if fallback:
            candidates.append({"text": fallback, "role": "fallback", "scores": {}, "metadata": {"orig": pipe_out}})

    return candidates

def ensure_final_subtasks(out, prompt):
    """
    Guarantee that we have a list of final_subtasks as strings/dicts.
    If out['final_subtasks'] is missing or contains plain strings that are not parsed,
    run the decomposer to produce subtasks.
    Returns a list (possibly empty) where each item is either a dict or string.
    """
    if isinstance(out, dict) and "final_subtasks" in out:
        fs = out["final_subtasks"]
        # if it's a non-empty list with useful text, return as-is
        if isinstance(fs, (list, tuple)) and any(safe_get_text(x) for x in fs):
            return fs
        if isinstance(fs, str) and fs.strip():
            return [fs]
    # Fallback: run the decomposer explicitly to get subtasks
    decomposed = run_decomposer(prompt)
    # run_decomposer might return a list of strings or a dict with "subtasks" etc.
    if isinstance(decomposed, dict):
        # common shapes: {"subtasks": [...]} or {"final_subtasks": [...]} or similar
        for key in ("final_subtasks", "subtasks", "steps", "decomposition"):
            if key in decomposed and isinstance(decomposed[key], (list, tuple)):
                return decomposed[key]
        # if dict itself represents a single subtask
        return [decomposed]
    if isinstance(decomposed, (list, tuple)):
        return decomposed
    return [decomposed]

def expand_fn_using_pipeline(prompt_text, metadata=None):
    """
    Expand a given node by calling run_inference_pipeline(prompt_text, auto_extend=False)
    and normalizing the results into candidate dicts with combined_score attached (if available).
    """
    out = run_inference_pipeline(prompt_text, auto_extend=False)
    cands = pipeline_to_candidates(out)
    reasoning = combined_reasoning_score(out, prompt_text)
    combined_num = numeric_scores_from_reasoning(reasoning)
    for c in cands:
        c.setdefault("scores", {})
        if "combined_score" not in c["scores"] and combined_num is not None:
            c["scores"]["combined_score"] = combined_num
        if "combined_score" not in c["scores"]:
            meta_scores = c.get("metadata", {})
            if isinstance(meta_scores, dict) and isinstance(meta_scores.get("scores"), dict):
                vals = [v for v in meta_scores["scores"].values() if isinstance(v, (int, float))]
                if vals:
                    c["scores"]["combined_score"] = float(sum(vals) / len(vals))
        c["scores"].setdefault("combined_score", 0.0)
        c.setdefault("metadata", {})
        c["metadata"]["reasoning_snapshot"] = reasoning
    return cands

# ---------- Main ----------
def main():
    example_prompt = "how can i learn french?"

    # 1) Run pipeline with auto_extend True to let it produce its outputs
    out = run_inference_pipeline(example_prompt, auto_extend=True)

    # 2) Ensure we have well-formed final_subtasks (use decomposer if necessary)
    normalized_final_subtasks = ensure_final_subtasks(out, example_prompt)

    # 3) Print FORMATTED FINAL SUBTASKS (robust)
    print("\n=== FORMATTED FINAL SUBTASKS ===")
    if isinstance(normalized_final_subtasks, (list, tuple)):
        for i, s in enumerate(normalized_final_subtasks, start=1):
            print(f"{i}. {safe_get_text(s)}")
    else:
        print("1.", safe_get_text(normalized_final_subtasks))

    # 4) Build a TreeOfThoughts with the prompt as root
    tree = TreeOfThoughts()
    root = tree.add_node(text=example_prompt, parent_id=None, role="root", metadata={"source": "user_prompt"})
    tree.set_root(root.id)

    # 5) Convert pipeline output into candidates and attach to root
    initial_candidates = pipeline_to_candidates(out)
    # If pipeline didn't produce candidates, create from normalized_final_subtasks
    if not initial_candidates:
        for s in normalized_final_subtasks:
            initial_candidates.append({"text": safe_get_text(s), "role": "final_subtask", "scores": {}, "metadata": {"orig": s}})

    # 6) Compute reasoning diagnostic and attach numeric combined score to nodes
    reasoning_result = combined_reasoning_score(out, example_prompt)
    root_combined = numeric_scores_from_reasoning(reasoning_result)
    tree.nodes[root.id].metadata["reasoning_diagnostic"] = reasoning_result

    for cand in initial_candidates:
        scores = cand.get("scores", {})
        if "combined_score" not in scores:
            scores["combined_score"] = root_combined if root_combined is not None else 0.0
        tree.add_node(
            text=cand.get("text", ""),
            parent_id=root.id,
            role=cand.get("role", "final_subtask"),
            scores=scores,
            metadata=cand.get("metadata", {})
        )

    # 7) Expand each top-level child once using decomposer-aware expansion (beam)
    beam_size = 2
    for child_id in list(tree.nodes[root.id].children):
        # Use expand_fn that calls pipeline; expand_with will take top `beam` returned candidates
        tree.expand_with(child_id, expand_fn=expand_fn_using_pipeline, beam=beam_size)

    # 8) Print top nodes by combined_score
    top_nodes = tree.top_k_by_score("combined_score", k=6)
    print("\nTop nodes by combined_score:")
    for n in top_nodes:
        print(f"- id={n.id[:8]} score={n.scores.get('combined_score')} role={n.role} text={n.text[:240]}")

    # 9) Print best path by summed combined_score
    path_nodes, path_score = tree.best_path("combined_score", max_depth=6)
    print("\nBest path (by summed combined_score):", path_score)
    for n in path_nodes:
        print(f" * [{n.role}] {n.text[:240]} (score={n.scores.get('combined_score')})")

    # 10) Save tree
    outpath = "tree_of_thoughts_example.json"
    tree.save_json(outpath)
    print(f"\nSaved tree to {outpath}")

    # 11) Print full reasoning diagnostic JSON
    print("\n=== REASONING DIAGNOSTIC ===")
    print(json.dumps(reasoning_result, indent=2))

if __name__ == "__main__":
    main()


Subtasks:
 1. Research and find reliable French language learning resources online.
 2. Determine the level of proficiency desired (beginner, intermediate, advanced).
 3. Choose a learning method (language learning app, textbook, tutor, self-study materials).
 4. Set up a schedule for regular practice sessions.
 5. Enroll in a French course or find a language exchange partner.
[DEBUG] run_decomposer returned type=<class 'list'>, preview=['Research and find reliable French language learning resources online.', 'Determine the level of proficiency desired (beginner, intermediate, advanced).', 'Choose a learning method (language learning app, textbook, tutor, self-study materials).', 'Set up a schedule for regular practice sessions.', 
Subtasks:
 1. Research French language learning resources online.
 2. Download or subscribe to a French language learning app.
 3. Find a local French language exchange partner or tutor.
 4. Start with basic French grammar rules and vocabulary.
 5. Practice 

In [None]:
# main.py
from tree_of_thoughts import TreeOfThoughts
from inference import run_inference_pipeline, run_decomposer
from CAMRE_EDU import combined_reasoning_score
import json
import io
import contextlib
from typing import Any, List

# ---------- Utilities ----------
def safe_get_text(item: Any) -> str:
    """Return readable text from str/dict/list/tuple; collapse whitespace."""
    if item is None:
        return ""
    if isinstance(item, str):
        return " ".join(item.split())
    if isinstance(item, (list, tuple)):
        parts = [safe_get_text(x) for x in item]
        return " ".join([p for p in parts if p])
    if isinstance(item, dict):
        # prefer keys likely to hold user-visible text
        for k in ("text", "input", "output", "content", "message", "prompt", "title", "step"):
            if k in item and item[k] is not None:
                return safe_get_text(item[k])
        # else join string-like values
        parts = []
        for v in item.values():
            if isinstance(v, (str, list, tuple, dict)):
                parts.append(safe_get_text(v))
        return " ".join([p for p in parts if p])
    return str(item)

def is_debug_or_control_text(text: str) -> bool:
    """
    Heuristic to detect and filter out debug/control messages:
    - very short strings (< 4 chars)
    - strings that look like internal notes: start with 'subtasks', 'debug', 'warning', 'verifier',
      or are of the form 'Subtasks sufficient', 'Forwarding inference', etc.
    - numeric-only or punctuation-only
    """
    if not isinstance(text, str):
        return True
    t = text.strip()
    if len(t) == 0:
        return True
    if len(t) < 4:
        return True
    tl = t.lower()
    debug_prefixes = ("subtasks", "debug", "warning", "verifier", "fallback", "note:", "error", "pipeline")
    for p in debug_prefixes:
        if tl.startswith(p):
            return True
    # typical one-off control sentences we saw
    if "forwarding inference" in tl or "subtasks sufficient" in tl:
        return True
    # numeric-only
    if all(ch.isdigit() or ch.isspace() for ch in tl):
        return True
    # punctuation-only
    if all(not ch.isalnum() for ch in tl):
        return True
    return False

def numeric_scores_from_reasoning(reasoning: Any):
    """Derive a single numeric combined score from a reasoning dict or numeric value."""
    if reasoning is None:
        return None
    if isinstance(reasoning, (int, float)):
        return float(reasoning)
    if isinstance(reasoning, dict):
        for key in ("reasoning_score", "combined", "combined_score", "confidence", "score", "overall"):
            v = reasoning.get(key)
            if isinstance(v, (int, float)):
                return float(v)
    return None

def ensure_numeric_scores(d: Any):
    if not isinstance(d, dict):
        return {}
    return {k: v for k, v in d.items() if isinstance(v, (int, float))}

def pipeline_to_candidates(pipe_out: Any) -> List[dict]:
    """
    Normalize pipeline output into list of canonical candidate dicts:
    {"text","role","scores","metadata"}.
    Filters out debug/control texts using is_debug_or_control_text.
    """
    candidates: List[dict] = []
    if pipe_out is None:
        return candidates

    if not isinstance(pipe_out, dict):
        if isinstance(pipe_out, str):
            text = safe_get_text(pipe_out)
            if not is_debug_or_control_text(text):
                candidates.append({"text": text, "role": "raw", "scores": {}, "metadata": {"orig": pipe_out}})
        elif isinstance(pipe_out, (list, tuple)):
            for it in pipe_out:
                text = safe_get_text(it)
                if not is_debug_or_control_text(text):
                    candidates.append({"text": text, "role": "raw", "scores": {}, "metadata": {"orig": it}})
        return candidates

    # Helper to ingest field while skipping debug-like entries
    def ingest(field_value: Any, role_hint: str):
        if field_value is None:
            return
        if isinstance(field_value, (list, tuple)):
            for item in field_value:
                text = safe_get_text(item)
                if is_debug_or_control_text(text):
                    continue
                meta = item if isinstance(item, dict) else {"orig": item}
                scores = ensure_numeric_scores(item.get("scores") if isinstance(item, dict) else {})
                candidates.append({"text": text, "role": role_hint, "scores": scores, "metadata": meta})
        else:
            text = safe_get_text(field_value)
            if is_debug_or_control_text(text):
                return
            meta = field_value if isinstance(field_value, dict) else {"orig": field_value}
            scores = ensure_numeric_scores(field_value.get("scores") if isinstance(field_value, dict) else {})
            candidates.append({"text": text, "role": role_hint, "scores": scores, "metadata": meta})

    # ingest canonical fields (prioritized)
    ingest(pipe_out.get("final_subtasks"), "final_subtask")
    ingest(pipe_out.get("suggestions"), "suggestion")
    ingest(pipe_out.get("outputs"), "output")
    ingest(pipe_out.get("thoughts"), "thought")
    ingest(pipe_out.get("intermediate_steps"), "thought")

    # permissive ingestion of other top-level fields, but skip 'message' if it looks like control text
    known = {"final_subtasks", "suggestions", "outputs", "thoughts", "intermediate_steps"}
    for k, v in pipe_out.items():
        if k in known:
            continue
        # skip fields that are likely debug-only by name
        if k.lower() in ("message", "debug", "warning", "verifier", "log"):
            # still inspect v, but only add if it looks substantive
            text = safe_get_text(v)
            if not is_debug_or_control_text(text):
                ingest(v, role_hint=f"field:{k}")
            continue
        if isinstance(v, (str, list, tuple, dict)):
            ingest(v, role_hint=f"field:{k}")

    # fallback to top-level text/content only if no candidates were found
    if not candidates:
        fallback = safe_get_text(pipe_out.get("text") or pipe_out.get("content") or pipe_out)
        if fallback and not is_debug_or_control_text(fallback):
            candidates.append({"text": fallback, "role": "fallback", "scores": {}, "metadata": {"orig": pipe_out}})

    return candidates

def ensure_final_subtasks(out: Any, prompt: str) -> (List[Any], str):
    """
    Guarantee a list of final_subtasks. Use run_decomposer when pipeline output doesn't include meaningful final_subtasks.
    Returns (final_subtasks_list, decomposer_debug_output).
    """
    if isinstance(out, dict) and "final_subtasks" in out:
        fs = out["final_subtasks"]
        # If fs is list-like and has at least one non-debug text, keep it
        if isinstance(fs, (list, tuple)) and any(not is_debug_or_control_text(safe_get_text(x)) for x in fs):
            # return only the substantive items (filter debug ones)
            filtered = [x for x in fs if not is_debug_or_control_text(safe_get_text(x))]
            return filtered, ""
        # if it's a non-empty string, wrap and return
        if isinstance(fs, str) and fs.strip() and not is_debug_or_control_text(fs):
            return [fs], ""
    # fallback to running decomposer (capture its printed debug)
    buf = io.StringIO()
    with contextlib.redirect_stdout(buf):
        decomposed = run_decomposer(prompt)
    debug = buf.getvalue()
    if isinstance(decomposed, dict):
        for key in ("final_subtasks", "subtasks", "steps", "decomposition"):
            if key in decomposed and isinstance(decomposed[key], (list, tuple)):
                filtered = [x for x in decomposed[key] if not is_debug_or_control_text(safe_get_text(x))]
                return filtered, debug
        # dict itself considered a single item unless it's clearly debug
        text = safe_get_text(decomposed)
        if not is_debug_or_control_text(text):
            return [decomposed], debug
        return [], debug
    if isinstance(decomposed, (list, tuple)):
        filtered = [x for x in decomposed if not is_debug_or_control_text(safe_get_text(x))]
        return filtered, debug
    # single string or other
    text = safe_get_text(decomposed)
    if text and not is_debug_or_control_text(text):
        return [decomposed], debug
    return [], debug

def expand_fn_using_pipeline(prompt_text: str, metadata=None):
    """
    Expand a node by calling run_inference_pipeline(prompt_text, auto_extend=False).
    Return normalized candidates and the captured debug string.
    """
    buf = io.StringIO()
    with contextlib.redirect_stdout(buf):
        out = run_inference_pipeline(prompt_text, auto_extend=False)
    debug = buf.getvalue()
    cands = pipeline_to_candidates(out)
    reasoning = combined_reasoning_score(out, prompt_text)
    combined_num = numeric_scores_from_reasoning(reasoning)
    for c in cands:
        c.setdefault("scores", {})
        if "combined_score" not in c["scores"] and combined_num is not None:
            c["scores"]["combined_score"] = combined_num
        if "combined_score" not in c["scores"]:
            meta_scores = c.get("metadata", {})
            if isinstance(meta_scores, dict) and isinstance(meta_scores.get("scores"), dict):
                vals = [v for v in meta_scores["scores"].values() if isinstance(v, (int, float))]
                if vals:
                    c["scores"]["combined_score"] = float(sum(vals) / len(vals))
        c["scores"].setdefault("combined_score", 0.0)
        c.setdefault("metadata", {})
        c["metadata"]["reasoning_snapshot"] = reasoning
    return cands, debug

# ---------- Main ----------
def main():
    prompt = "how can i learn french?"

    # 1) Run pipeline with auto_extend True; capture printed debug
    buf = io.StringIO()
    with contextlib.redirect_stdout(buf):
        out = run_inference_pipeline(prompt, auto_extend=True)
    pipeline_debug = buf.getvalue()

    # 2) Ensure final_subtasks exist (use decomposer if necessary)
    final_subtasks, decomposer_debug = ensure_final_subtasks(out, prompt)

    # 3) Print formatted final_subtasks cleanly
    print("\n=== FORMATTED FINAL SUBTASKS ===")
    if final_subtasks:
        for i, s in enumerate(final_subtasks, start=1):
            print(f"{i}. {safe_get_text(s)}")
    else:
        print("No substantive final_subtasks produced.")

    # 4) Build the TreeOfThoughts and attach root diagnostics
    tree = TreeOfThoughts()
    root = tree.add_node(text=prompt, parent_id=None, role="root", metadata={"source": "user_prompt"})
    tree.set_root(root.id)

    reasoning_result = combined_reasoning_score(out, prompt)
    root_combined = numeric_scores_from_reasoning(reasoning_result)
    tree.nodes[root.id].metadata["reasoning_diagnostic"] = reasoning_result

    # 5) Build initial candidates from pipeline output, but filter them
    initial_candidates = pipeline_to_candidates(out)
    # If pipeline didn't emit candidates, use final_subtasks as fallback
    if not initial_candidates and final_subtasks:
        for s in final_subtasks:
            txt = safe_get_text(s)
            if not is_debug_or_control_text(txt):
                initial_candidates.append({"text": txt, "role": "final_subtask", "scores": {}, "metadata": {"orig": s}})

    # 6) Add filtered candidates to tree (skip any with empty/invalid text)
    for cand in initial_candidates:
        text = safe_get_text(cand.get("text", ""))
        if is_debug_or_control_text(text):
            continue
        scores = cand.get("scores", {})
        if "combined_score" not in scores:
            scores["combined_score"] = root_combined if root_combined is not None else 0.0
        tree.add_node(text=text, parent_id=root.id, role=cand.get("role", "final_subtask"), scores=scores, metadata=cand.get("metadata", {}))

    # 7) Expand children once (beam), capturing expansion debug logs
    expansion_debugs: List[str] = []
    beam = 2
    for child_id in list(tree.nodes[root.id].children):
        cands, dbg = expand_fn_using_pipeline(tree.nodes[child_id].text, tree.nodes[child_id].metadata)
        expansion_debugs.append(dbg)
        # filter candidates and sort by combined_score
        filtered = [c for c in cands if not is_debug_or_control_text(safe_get_text(c.get("text", "")))]
        filtered_sorted = sorted(filtered, key=lambda x: x.get("scores", {}).get("combined_score", 0.0), reverse=True)[:beam]
        for cand in filtered_sorted:
            txt = safe_get_text(cand.get("text", ""))
            if is_debug_or_control_text(txt):
                continue
            tree.add_node(text=txt, parent_id=child_id, role=cand.get("role"), scores=cand.get("scores", {}), metadata=cand.get("metadata", {}))

    # 8) Print top nodes by combined_score (only substantive nodes)
    top_nodes = tree.top_k_by_score("combined_score", k=8)
    # filter out nodes with empty or debug text
    top_nodes = [n for n in top_nodes if not is_debug_or_control_text(safe_get_text(n.text))]
    print("\nTop nodes by combined_score:")
    for n in top_nodes:
        print(f"- id={n.id[:8]} score={n.scores.get('combined_score')} role={n.role} text={n.text}")

    # 9) Best path by summed combined_score (skip debug nodes when evaluating)
    path_nodes, path_score = tree.best_path("combined_score", max_depth=6)
    # trim debug nodes from path for clearer display
    path_nodes_clean = [n for n in path_nodes if not is_debug_or_control_text(safe_get_text(n.text))]
    print("\nBest path (by summed combined_score):", path_score)
    for n in path_nodes_clean:
        print(f" * [{n.role}] {n.text} (score={n.scores.get('combined_score')})")

    # 10) Save tree and debug logs
    outpath = "tree_of_thoughts_example.json"
    tree.save_json(outpath)
    print(f"\nSaved tree to {outpath}")

    # Save debug logs so you can inspect what pipeline/decomposer printed
    with open("pipeline_debug.log", "w", encoding="utf-8") as f:
        f.write("=== PIPELINE DEBUG ===\n\n")
        f.write(pipeline_debug)
        f.write("\n\n=== DECOMPOSER DEBUG ===\n\n")
        f.write(decomposer_debug)
        f.write("\n\n=== EXPANSION DEBUG (concatenated) ===\n\n")
        f.write("\n---\n".join(expansion_debugs))

    # 11) Print reasoning diagnostic
    print("\n=== REASONING DIAGNOSTIC ===")
    print(json.dumps(reasoning_result, indent=2))

if __name__ == "__main__":
    main()


  from .autonotebook import tqdm as notebook_tqdm
  llm = Ollama(model=MODEL)


Loading embedding model: all-MiniLM-L6-v2
Embedding model loaded.


  out = llm(prompt)                 # callable



=== FORMATTED FINAL SUBTASKS ===
1. Look up reputable online resources for learning French.
2. Find a language learning app or website that offers French lessons.
3. Consult with a language exchange partner or tutor who speaks French.
4. Purchase a textbook or audio materials to supplement learning.
5. Set aside time each week to practice speaking and listening in French.


In [7]:
import json
p = "../../tree_of_thoughts_example.json"
with open(p, "r", encoding="utf-8") as f:
    data = json.load(f)
# print top-level keys
print(data.keys())
# Inspect nodes that have non-empty metadata
for nid, n in data["nodes"].items():
    meta = n.get("metadata", {})
    text = n.get("text", "")
    if text.strip() == "" and meta:
        print("node", nid, "has metadata keys:", list(meta.keys()))
        # print a preview
        print("  text:", text)
        print("  metadata preview:", {k: str(meta[k])[:200] for k in meta})


dict_keys(['tree_id', 'root_id', 'nodes'])


In [1]:
# pipeline_tot_simple.py
# Bounded Tree-of-Thoughts: 3–4 LLM calls max

from tree_of_thoughts import TreeOfThoughts
from inference import run_decomposer, run_inference_pipeline
import time, io, contextlib
from typing import List

# ---------------- CONFIG ----------------
MAX_SUBTASKS = 4
MAX_EXPANSIONS = 2   # how many subtasks to expand

# ---------------- helpers ----------------
def normalize(x):
    if x is None:
        return ""
    if isinstance(x, str):
        return " ".join(x.split())
    if isinstance(x, (list, tuple)):
        return " ".join(normalize(i) for i in x if normalize(i))
    if isinstance(x, dict):
        for k in ("text", "content", "message", "step"):
            if k in x:
                return normalize(x[k])
    return ""

def clean(items: List[str]) -> List[str]:
    out, seen = [], set()
    for s in items:
        t = normalize(s)
        if not t:
            continue
        k = t.lower()
        if k in seen:
            continue
        seen.add(k)
        out.append(t)
    return out[:MAX_SUBTASKS]

# ---------------- main ----------------
def main(prompt: str):
    """
    Guarantees:
    - small ToT
    - <= 4 LLM calls
    - no recursion
    """

    tree = TreeOfThoughts()

    # ---- root ----
    root = tree.add_node(
        text=prompt,
        parent_id=None,
        role="root",
        metadata={"ts": time.time()}
    )
    tree.set_root(root.id)

    # ---- 1) Decompose ONCE ----
    buf = io.StringIO()
    with contextlib.redirect_stdout(buf):
        subtasks = run_decomposer(prompt)

    if not isinstance(subtasks, list):
        subtasks = [subtasks]

    subtasks = clean(subtasks)

    # ---- add subtasks as children ----
    subtask_nodes = []
    for s in subtasks:
        n = tree.add_node(
            text=s,
            parent_id=root.id,
            role="subtask",
            metadata={"source": "decomposer"}
        )
        subtask_nodes.append(n)

    # ---- 2) Expand only top-k subtasks ONCE ----
    for n in subtask_nodes[:MAX_EXPANSIONS]:
        out = run_inference_pipeline(n.text, auto_extend=False)

        expansions = []
        if isinstance(out, dict):
            for key in ("thoughts", "suggestions", "steps", "outputs"):
                if key in out:
                    vals = out[key]
                    if not isinstance(vals, list):
                        vals = [vals]
                    expansions.extend(vals)

        expansions = clean(expansions)[:2]  # very small fan-out

        for e in expansions:
            tree.add_node(
                text=e,
                parent_id=n.id,
                role="thought",
                metadata={"source": "inference"}
            )

    # ---- done ----
    tree.save_json("tree_of_thoughts_example.json")

    # ---- print for visibility ----
    print("\n=== TREE (SIMPLE ToT) ===")
    print(prompt)
    for c in tree.nodes[root.id].children:
        print(" └─", tree.nodes[c].text)
        for gc in tree.nodes[c].children:
            print("    └─", tree.nodes[gc].text)
            
    
    
    # -------------------- SAVE TREE (JSON) --------------------
    JSON_PATH = "tree_of_thoughts_example.json"
    tree.save_json(JSON_PATH)
    print(f"[INFO] Tree saved to {JSON_PATH}")
    
    return {
        "tree": tree,
        "llm_calls_used": 1 + min(len(subtask_nodes), MAX_EXPANSIONS)
    }


  from .autonotebook import tqdm as notebook_tqdm
  llm = Ollama(model=MODEL)


In [2]:
# main.py

# from pipeline_subtasks_only import main

PROMPT = (
    "what are trignometric identities and how can we solve "
    "(51^2 - 49^2) efficiently using them?"
)

main(PROMPT)


  out = llm(prompt)                 # callable


Subtasks:
 1. Browse online mathematical resources for trigonometry.
 2. Search for "trigonometric identities" on math websites.
 3. Evaluate search results to determine relevance.
 4. Categorize identified identities by type (e.g., sum/difference, double angle).
 5. Record and organize key identities in a searchable database or note-taking system.
[DEBUG] run_decomposer returned type=<class 'list'>, preview=['Browse online mathematical resources for trigonometry.', 'Search for "trigonometric identities" on math websites.', 'Evaluate search results to determine relevance.', 'Categorize identified identities by type (e.g., sum/difference, double angle).', 'Record and organize key identities in a searchab
Subtasks:
 1. Calculate the value of 51^2.
 2. Calculate the value of 49^2.
 3. Subtract 49^2 from 51^2.
 4. Identify identities that can be applied to simplify the expression from step 3.
[DEBUG] run_decomposer returned type=<class 'list'>, preview=['Calculate the value of 51^2.', 'Cal

{'tree': <tree_of_thoughts.TreeOfThoughts at 0x26674994b60>,
 'llm_calls_used': 3}