# Lesson 6: Improve Agent's GPA

In this lesson, you'll make two targeted changes to the agent:

1. Adjust the planning prompt to include explicit goals, pre-conditions, and post-conditions for each step. This helps the executor understand the sub-goals it needs to reach.

2. You will add inline evals so the agent receives feedback on when to do additional research. This provides the executor feedback on whether it's reaching its sub-goals.

```+---------------------------------------------------------------------------------------------------------+
|                                    ARCHITECTURE DU SYSTEME MULTI-AGENTS                                 |
+---------------------------------------------------------------------------------------------------------+
                                                |
                                          [User Query]
                                                |
                                                v
+---------------------------------------------------------------------------------------------------------+
|  1. PLANNER NODE (Le Cerveau)                                                                           |
|  -----------------------------------------------------------------------------------------------------  |
|  * Fonction : `planner_node(state)`                                                                     |
|  * Prompt   : `patched_plan_prompt` (qui utilise RECURSION_LIMIT pour g√©rer le budget)                  |
|  * Sortie   : G√©n√®re un plan JSON (Step 1, Step 2, ...) ajout√© au `state['plan']`                       |
+---------------------------------------------------------------------------------------------------------+
                                                |
                                                v
          +---------------------------------------------------------------------------+
          |                                                                           |
          |   2. EXECUTOR NODE (Le Chef d'Orchestre)    <-------------------------+   |
          |   -----------------------------------------------------------------   |   |
          |   * Fonction : `executor_node(state)`                                 |   |
          |   * Logique  : Lit le `state['plan']`, v√©rifie `current_step`.        |   |
          |                D√©cide quel agent appeler (Routing).                   |   |
          |                G√®re les drapeaux `replan` et `previous_step_failed`.  |   |
          |                                                                           |
          +----------------------+----------------------+-------------------------+---+
                                 |                      |                         |
        +------------------------+                      |                         |
        | (Route: "cortex_researcher")                  | (Route: "web_...")      | (Route: "chart_...")
        v                                               v                         v
+------------------------------------+    +-----------------------------+    +-----------------------------+
| 3a. CORTEX RESEARCH NODE           |    | 3b. WEB RESEARCH NODE       |    | 3c. CHART GENERATOR NODE    |
| ---------------------------------- |    | --------------------------- |    | --------------------------- |
| * Func: `cortex_agents_research_...|    | * Func: `web_research_node` |    | * Func: `chart_node`        |
| * Outils :                         |    | * Agent : ReAct Agent       |    | * Agent : ReAct Agent       |
|   - `wikipedia_rag_tool`           |    | * Outil : `tavily_tool`     |    | * Outil : `PythonREPL`      |
|   - `wikidata_sparql_tool`         |    |                             |    |                             |
| * Note: Utilise `_cortex_llm_...`  |    |                             |    |                             |
+-----------------+------------------+    +--------------+--------------+    +--------------+--------------+
                  |                                      |                                  |
                  |                                      |                                  |
                  +-------------------+------------------+----------------------------------+
                                      |
                                      | (R√©sultat de l'√©tape / Demande de Replanification)
                                      | Retour vers Executor
                                      |
+---------------------------------------------------------------------------------------------------------+
|                                     CONDITION DE FIN                                                    |
| Si (toutes les √©tapes finies) OU (replanification impossible) OU (remaining_steps <= 0)                 |
+---------------------------------------------------------------------------------------------------------+
                                                |
                                                v
+---------------------------------------------------------------------------------------------------------+
|  4. SYNTHESIZER NODE (Le R√©dacteur)                                                                     |
|  -----------------------------------------------------------------------------------------------------  |
|  * Fonction : `synthesizer_node(state)`                                                                 |
|  * Prompt   : `final_answer_prompt`                                                                     |
|  * Entr√©e   : Prend tout l'historique des messages et des r√©sultats d'outils.                           |
|  * Sortie   : R√©ponse finale structur√©e pour l'utilisateur.                                             |
+---------------------------------------------------------------------------------------------------------+
                                                |
                                                v
                                          [Final Answer]

In [None]:
# @title 1. Install Dependencies & Setup (will kill 1st time then, re-launch)
import os, sys, time
if os.path.exists(".lib_installed"):
    print("Dependencies installed.")
else:
  !pip install -q \
      langchain \
      langchain-core \
      langchain-community \
      langchain-openai \
      langchain-experimental \
      langchain-tavily \
      langgraph \
      trulens-core trulens-providers-openai trulens-apps-langgraph trulens-dashboard \
      opentelemetry-sdk nest-asyncio2 openinference-instrumentation-langchain arize-phoenix uvicorn \
      python-dotenv \
      wikipedia \
      SPARQLWrapper

  with open(".lib_installed", "w") as f: f.write("Installation OK")

  # # Si on est dans Colab, on tue le processus pour forcer le rechargement des nouvelles librairies
  # if "google.colab" in sys.modules:
  #     print("üîÑ Red√©marrage automatique de la session pour appliquer les mises √† jour... ‚ö†Ô∏è (Vous verrez peut-√™tre une notification 'Session √©cras√©e', c'est normal !)")
  #     time.sleep(1)
  #     os.kill(os.getpid(), 9)

import nest_asyncio2 as nest_asyncio
nest_asyncio.apply()


Dependencies installed.


In [None]:
# @title 2. Central Configuration & Secrets
import os
groq = True

os.environ["TAVILY_API_KEY"] = "XXX"  # PASTE TAVILY KEY

os.environ["OPENAI_BASE_URL"] = "https://api.groq.com/openai/v1" # OPENROUTER: "https://openrouter.ai/api/v1", GROQ: "https://api.groq.com/openai/v1"
os.environ["OPENAI_API_KEY"] = "XXX" # OPENROUTER: "sk-XXX", GROQ:
# --- 2. MODELS DEFINITION
if "groq" in os.environ["OPENAI_BASE_URL"]:
    os.environ["MODEL_EXECUTOR"] = "llama-3.1-8b-instant" # "llama-3.3-70b-versatile" # Llama 3.3 70B est le plus polyvalent ("Versatile") pour la r√©daction et la synth√®se.
    os.environ["MODEL_REASONING"] = "llama-3.1-8b-instant" # "llama-3.3-70b-versatile" # On utilise DeepSeek R1 (version distill√©e sur Llama 70B) - C'est ACTUELLEMENT le meilleur mod√®le de raisonnement gratuit sur Groq ("Thinking Model").
    os.environ["MODEL_EVAL"] = "llama-3.1-8b-instant" # "llama-3.3-70b-versatile" # On r√©utilise Llama 3.3 pour avoir une critique de qualit√©. - Si vous avez trop d'erreurs 429 (quota), remplacez celui-ci par "llama-3.1-8b-instant"
else:
    os.environ["MODEL_EXECUTOR"] = "openai/gpt-5-nano" # "google/gemini-2.0-flash-lite-001" # "openai/gpt-5-nano"
    os.environ["MODEL_REASONING"] = "openai/gpt-5-nano" # "openai/gpt-oss-120b" # or "openai/o3-mini"
    os.environ["MODEL_EVAL"] = "openai/gpt-5-nano" # "google/gemini-2.0-flash-lite-001" # "deepseek/deepseek-r1-distill-qwen-14b" # "openai/gpt-5-nano"

# --- 3. TRULENS SETUP ---
os.environ["TRULENS_OTEL_TRACING"] = "1"

# Setup a rate limiter to ensure to enjoy free Groq API :-)

In [None]:
import time
import openai
from collections import deque

# --- CONFIGURATION (Safe Mode) ---
RPM_LIMIT = 20        # On vise 20 pour rester sous les 30 (marge de s√©cu)
MAX_RETRIES = 5       # Nombre d'essais en cas d'erreur 429
BASE_SLEEP = 2        # Temps d'attente initial (backoff exponentiel)

# Stockage des timestamps pour le Rate Limiter
_timestamps = deque(maxlen=RPM_LIMIT)

# On patch le niveau le plus bas : la m√©thode `request` du client HTTP interne
# Cela couvre TOUT : LangChain, TruLens, appels directs, nouveaux imports.
if not hasattr(openai._base_client.SyncHttpxClientWrapper, "_original_request"):
    openai._base_client.SyncHttpxClientWrapper._original_request = openai._base_client.SyncHttpxClientWrapper.request

def protected_request(self, *args, **kwargs):
    # 1. Rate Limiter Pr√©ventif (Sliding Window)
    if len(_timestamps) == RPM_LIMIT:
        elapsed = time.time() - _timestamps[0]
        if elapsed < 60:
            time.sleep(60 - elapsed + 0.5)

    # 2. Retry Logic pour erreur 429 (Tokens/TPM)
    for attempt in range(MAX_RETRIES):
        try:
            # Appel r√©el
            response = self._original_request(*args, **kwargs)
            _timestamps.append(time.time()) # Succ√®s -> on note l'heure
            return response
        except Exception as e:
            # On d√©tecte l'erreur 429 (Too Many Requests)
            if "429" in str(e) and attempt < MAX_RETRIES - 1:
                wait = BASE_SLEEP * (2 ** attempt) # 2s, 4s, 8s, 16s...
                print(f"‚ö†Ô∏è Quota Groq atteint (429). Pause de {wait}s...")
                time.sleep(wait)
            else:
                raise e # Autre erreur ou max retries -> on plante

# Application du patch
openai._base_client.SyncHttpxClientWrapper.request = protected_request
print(f"üõ°Ô∏è Groq Armor activ√© : {RPM_LIMIT} RPM + Auto-Retry sur 429 (LangChain & TruLens prot√©g√©s)")

üõ°Ô∏è Groq Armor activ√© : 20 RPM + Auto-Retry sur 429 (LangChain & TruLens prot√©g√©s)


In [None]:
# @title Initiate üöÄ Phoenix monitoring of Langchain / LangGraph
import phoenix as px
from openinference.instrumentation.langchain import LangChainInstrumentor
import os, time
from google.colab import output

os.environ["PHOENIX_PORT"] = "6002" # Petite s√©curit√© pour √©viter le conflit de ports si vous relancez plusieurs fois
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "http://localhost:" + os.environ["PHOENIX_PORT"] + "/v1/traces"
os.environ["PHOENIX_PROJECT_NAME"] = "langgraph-data-toulon"

try:
    phoenix_session = px.launch_app() # 1. Lancer l'UI locale
    time.sleep(5) # Give it time to spin up
    print(f"üöÄ Phoenix UI is ready at: {phoenix_session.url}")
    # try: # L'instumentation est d√©plac√©e apr√®s l'initialisation de TruGraph pour se brancher dessus pour permettre de partager le flux OTEL
    #     LangChainInstrumentor().instrument() # 2. Activer l'instrumentation
    #     print("‚úÖ Instrumentation activ√©e.") # LangChainInstrumentor capture aussi les noeuds LangGraph de base
    # except Exception as e:
    #     print(f"‚ö†Ô∏è Erreur d'instrumentation (peut-√™tre d√©j√† active): {e}")
    output.serve_kernel_port_as_iframe(os.environ["PHOENIX_PORT"], height=1000) # Cela ouvre une fen√™tre directement dans le notebook
except Exception as e:
    print(f"Erreur au lancement: {e}")



WARNI [phoenix.session.session] Existing running Phoenix instance detected! Shutting it down and starting a new instance...
ERROR [opentelemetry.sdk._shared_internal] Exception while exporting Span.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/urllib3/connection.py", line 198, in _new_conn
    sock = connection.create_connection(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/urllib3/util/connection.py", line 85, in create_connection
    raise err
  File "/usr/local/lib/python3.12/dist-packages/urllib3/util/connection.py", line 73, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/urllib3/connectionpool.py", line 787, in urlopen
    response = self._make_request(
               ^^^^^^^^^^^^^^^^^^^
  File "/usr

üåç To view the Phoenix app in your browser, visit https://nelnk6aryaa9-496ff2e9c6d22116-6002-colab.googleusercontent.com/
üìñ For more information on how to use Phoenix, check out https://arize.com/docs/phoenix
üöÄ Phoenix UI is ready at: https://nelnk6aryaa9-496ff2e9c6d22116-6002-colab.googleusercontent.com/


<IPython.core.display.Javascript object>

Cr√©ation de prompts.py, helper.py

In [None]:
# @title
#%%writefile prompts.py
from typing import Dict, Any, List
#from langchain.schema import HumanMessage  # type: ignore[import-not-found]
from langchain_core.messages import HumanMessage
import json
from typing import Optional
from langgraph.graph import MessagesState
from langgraph.types import Command
from typing import Literal, Optional, List, Dict, Any, Type

MAX_REPLANS = 2

# Custom State class with specific keys
class State(MessagesState):
    enabled_agents: Optional[List[str]]
    # Current plan only: mapping from step number (as string) to step definition
    plan: Optional[Dict[str, Dict[str, Any]]]
    user_query: Optional[str]
    current_step: int
    replan_flag: Optional[bool]
    last_reason: Optional[str]
    # Replan attempts tracked per step number
    replan_attempts: Optional[Dict[int, int]]
    agent_query: Optional[str]

def get_agent_descriptions() -> Dict[str, Dict[str, Any]]:
    """
    Return structured agent descriptions with capabilities and guidelines.
    Edit this function to change how the planner/executor reason about agents.
    """
    return {
        "web_researcher": {
            "name": "Web Researcher",
            "capability": "Fetch public data via Tavily web search",
            "use_when": "Public information, news, current events, or external facts are needed",
            "limitations": "Cannot access private/internal company data",
            "output_format": "Raw research data and findings from public sources",
        },
        "cortex_researcher": {
            "name": "Cortex Researcher",
            # "capability": "Query private/company data in Snowflake, including structured deal records (company name, deal value, sales rep, close date, deal status, product line) and unstructured sales meeting notes, via Snowflake Cortex Agents.",
            # "use_when": "Internal documents, company databases, or private data access is required",
            # "limitations": "Cannot access public web data",
            # "output_format": "For structured requests, return the exact fields and include SQL when applicable; for unstructured, return concise relevant excerpts with citations.",
            "capability": "Query general knowledge. Use Wikidata for structured facts (dates, lists, counts) and Wikipedia for unstructured summaries.",
            "use_when": "Questions about real-world entities, history, or factual lists.",
            "limitations": "Cannot access private company data.",
            "output_format": "Structured tables (Wikidata) or text summaries (Wikipedia).",
            },
        "chart_generator": {
            "name": "Chart Generator",
            "capability": "Build visualizations from structured data",
            "use_when": "User explicitly requests charts, graphs, plots, visualizations (keywords: chart, graph, plot, visualise, bar-chart, line-chart, histogram, etc.)",
            "limitations": "Requires structured data input from previous steps",
            "output_format": "Visual charts and graphs",
            "position_requirement": "Must be used as final step after data gathering is complete",
        },
        "chart_summarizer": {
            "name": "Chart Summarizer",
            "capability": "Summarize and explain chart visualizations",
            "use_when": "After chart_generator has created a visualization",
            "limitations": "Requires a chart as input",
            "output_format": "Written summary and analysis of chart content",
        },
        "synthesizer": {
            "name": "Synthesizer",
            "capability": "Write comprehensive prose summaries of findings",
            "use_when": "Final step when no visualization is requested - combines all previous research",
            "limitations": "Requires research data from previous steps",
            "output_format": "Coherent written summary incorporating all findings",
            "position_requirement": "Should be used as final step when no chart is needed",
        },
    }

def _get_enabled_agents(state: State | None = None) -> List[str]:
    """Return enabled agents; if absent, use baseline/default.

    Supports both dict-style and attribute-style state objects.
    """
    baseline = ["web_researcher", "chart_generator", "chart_summarizer", "synthesizer"]
    if not state:
        return baseline
    val = state.get("enabled_agents") if hasattr(state, "get") else getattr(state, "enabled_agents", None)

    if isinstance(val, list) and val:
        allowed = {"web_researcher", "cortex_researcher", "chart_generator", "chart_summarizer", "synthesizer"}
        filtered = [a for a in val if a in allowed]
        return filtered
    return baseline

def format_agent_list_for_planning(state: State | None = None) -> str:
    """
    Format agent descriptions for the planning prompt.
    """
    descriptions = get_agent_descriptions()
    enabled_list = _get_enabled_agents(state)
    agent_list = []

    for agent_key, details in descriptions.items():
        if agent_key not in enabled_list:
            continue
        agent_list.append(f"  ‚Ä¢ `{agent_key}` ‚Äì {details['capability']}")

    return "\n".join(agent_list)

def format_agent_guidelines_for_planning(state: State | None = None) -> str:
    """
    Format agent usage guidelines for the planning prompt.
    """
    descriptions = get_agent_descriptions()
    enabled = set(_get_enabled_agents(state))
    guidelines = []

    # Cortex vs Web researcher (only include guidance for enabled agents)
    if "cortex_researcher" in enabled:
        guidelines.append(f"- Use `cortex_researcher` when {descriptions['cortex_researcher']['use_when'].lower()}.")
    if "web_researcher" in enabled:
        guidelines.append(f"- Use `web_researcher` for {descriptions['web_researcher']['use_when'].lower()}.")

    # Chart generator specific rules
    if "chart_generator" in enabled:
        chart_desc = descriptions['chart_generator']
        cs_hint = " A `chart_summarizer` should be used to summarize the chart." if "chart_summarizer" in enabled else ""
        guidelines.append(f"- **Include `chart_generator` _only_ if {chart_desc['use_when'].lower()}**. Do NOT use it for text summaries, news articles, or lists of topics. If included, `chart_generator` must be {chart_desc['position_requirement'].lower()}.")

    # Synthesizer default
    if "synthesizer" in enabled:
        synth_desc = descriptions['synthesizer']
        guidelines.append(f"  ‚Äì Otherwise use `synthesizer` as {synth_desc['position_requirement'].lower()}, and be sure to include all of the data from the previous steps.")

    return "\n".join(guidelines)

def format_agent_guidelines_for_executor(state: State | None = None) -> str:
    """
    Format agent usage guidelines for the executor prompt.
    """
    descriptions = get_agent_descriptions()
    enabled = _get_enabled_agents(state)
    guidelines = []

    if "web_researcher" in enabled:
        web_desc = descriptions['web_researcher']
        guidelines.append(f"- Use `\"web_researcher\"` when {web_desc['use_when'].lower()}.")
    if "cortex_researcher" in enabled:
        cortex_desc = descriptions['cortex_researcher']
        guidelines.append(f"- Use `\"cortex_researcher\"` for {cortex_desc['use_when'].lower()}.")

    return "\n".join(guidelines)

def plan_prompt(state: State) -> HumanMessage:
    """
    Build the prompt that instructs the LLM to return a high‚Äëlevel plan.
    """
    replan_flag   = state.get("replan_flag", False)
    user_query    = state.get("user_query", state["messages"][0].content)
    prior_plan    = state.get("plan") or {}
    replan_reason = state.get("last_reason", "")

    # Get agent descriptions dynamically

    agent_list = format_agent_list_for_planning(state)
    agent_guidelines = format_agent_guidelines_for_planning(state)

    enabled_list = _get_enabled_agents(state)

    # Build planner agent enum based on enabled agents
    enabled_for_planner = [
        a for a in enabled_list
        if a in ("web_researcher", "cortex_researcher", "chart_generator", "synthesizer")
    ]
    planner_agent_enum = " | ".join(enabled_for_planner) or "web_researcher | chart_generator | synthesizer"

    prompt = f"""
        You are the **Planner** in a multi‚Äëagent system.  Break the user's request
        into a sequence of numbered steps (1,‚ÄØ2,‚ÄØ3, ‚Ä¶).  **There is no hard limit on
        step count** as long as the plan is concise and each step has a clear goal.

        You may decompose the user's query into sub-queries, but **prioritize grouping related information retrieval**.
        Avoid creating unnecessary granular steps in order to save execution budget while maintaining quality.

        For example, "Find the top 5 cities AND their populations" should be a SINGLE step, not two.

        However, if the user's query is "What were the key
        action items in the last quarter, and what was a recent news story for
        each of them?", you may break it into steps:

        1. Fetch the key action items in the last quarter.
        2. Fetch a recent news story for the first action item.
        3. Fetch a recent news story for the second action item.
        4. Fetch a recent news story for the last action item

        Here is a list of available agents you can call upon to execute the tasks in your plan. You may call only one agent per step.

        {agent_list}

        Return **ONLY** valid JSON (no markdown, no explanations) in this form:

        {{
        "1": {{
            "agent": "{planner_agent_enum}",
            "action": "string",
        }},
        "2": {{ ... }},
        "3": {{ ... }}
        }}

        Guidelines:
        {agent_guidelines}
        """

    if replan_flag:
        prompt += f"""
        The current plan needs revision because: {replan_reason}

        Current plan:
        {json.dumps(prior_plan, indent=2)}

        When replanning:
        - Focus on UNBLOCKING the workflow rather than perfecting it.
        - Only modify steps that are truly preventing progress.
        - Prefer simpler, more achievable alternatives over complex rewrites.
        """

    else:
        prompt += "\nGenerate a new plan from scratch."

    prompt += f'\nUser query: "{user_query}"'

    return HumanMessage(content=prompt)

#@instrument(attributes=lambda ret, exception, *args, **kwargs: {"retrieved_execution": ret.update.get("messages", [HumanMessage(content="")])[-1].content})
def executor_prompt(state: State) -> HumanMessage:
    """
    Build the single‚Äëturn JSON prompt that drives the executor LLM.
    """
    step = int(state.get("current_step", 0))
    latest_plan: Dict[str, Any] = state.get("plan") or {}
    plan_block: Dict[str, Any] = latest_plan.get(str(step), {})
    max_replans    = MAX_REPLANS
    attempts       = (state.get("replan_attempts", {}) or {}).get(step, 0)

    # Get agent guidelines dynamically
    executor_guidelines = format_agent_guidelines_for_executor(state)
    plan_agent = plan_block.get("agent", "web_researcher")

    messages_tail = (state.get("messages") or [])[-4:]

    executor_prompt = f"""
        **IMPORTANT:** Respond **ONLY** with a valid JSON object. Do NOT include any additional text, explanation, or conversational phrases, such as "FINAL ANSWER".

        {{
        "replan": <true|false>,
        "goto": "<{ '|'.join([a for a in _get_enabled_agents(state) if a in ['web_researcher','cortex_researcher','chart_generator','chart_summarizer','synthesizer']] + ['planner']) }>",
        "reason": "<1 sentence>",
        "query": "<text>"
        }}

        You are the **executor** in a multi‚Äëagent system with these agents:
        `{ '`, `'.join(sorted(set([a for a in _get_enabled_agents(state) if a in ['web_researcher','cortex_researcher','chart_generator','chart_summarizer','synthesizer']] + ['planner']))) }`.

        **Tasks**
        1. Decide if the current plan needs revision.  ‚Üí `"replan_flag": true|false`
        2. Decide which agent to run next.             ‚Üí `"goto": "<agent_name>"`
        3. Give one‚Äësentence justification.            ‚Üí `"reason": "<text>"`
        4. Write the exact question that the chosen agent should answer
                                                    ‚Üí "query": "<text>"

        **Guidelines**
        {executor_guidelines}
        - After **{MAX_REPLANS}** failed replans for the same step, move on.
        - If you *just replanned* (replan_flag is true) let the assigned agent try before
        requesting another replan.

        **PRIORITIZE FORWARD PROGRESS:** Only replan if the current step is completely blocked.
        1. If any reasonable data was obtained that addresses the step's core goal, set `"replan": false` and proceed.
        2. Set `"replan": true` **only if** ALL of these conditions are met:
        ‚Ä¢ The step has produced zero useful information
        ‚Ä¢ The missing information cannot be approximated or obtained by remaining steps
        ‚Ä¢ `attempts < {max_replans}`
        3. When `attempts == {max_replans}`, always move forward (`"replan": false`).

        ### Decide `"goto"`
        - If `"replan": true` ‚Üí `"goto": "planner"`.
        - If current step has made reasonable progress ‚Üí move to next step's agent.
        - Otherwise execute the current step's assigned agent (`{plan_agent}`).

        ### Build `"query"`
        Write a clear, standalone instruction for the chosen agent. If the chosen agent
        is `web_researcher` or `cortex_researcher`, the query should be a standalone question,
        written in plain english, and answerable by the agent.

        Ensure that the query uses consistent language as the user's query.

        Context you can rely on
        - User query ..............: {state.get("user_query")}
        - Current step index ......: {step}
        - Current plan step .......: {plan_block}
        - Just‚Äëreplanned flag .....: {state.get("replan_flag")}
        - Previous messages .......: {messages_tail}
        """

    return HumanMessage(
        content=executor_prompt
    )

def agent_system_prompt(suffix: str) -> str:
    return (
        "You are a helpful AI assistant, collaborating with other assistants."
        " Use the provided tools to progress towards answering the question."
        " If you are unable to fully answer, that's OK, another assistant with different tools "
        " will help where you left off. Execute what you can to make progress."
        " If you or any of the other assistants have the final answer or deliverable,"
        " prefix your response with FINAL ANSWER so the team knows to stop."
        f"\n{suffix}"
    )


In [None]:
# @title
#%%writefile helper.py
from __future__ import annotations
# pyright: reportMissingImports=false, reportMissingTypeStubs=false, reportIncompatibleMethodOverride=false
import warnings

warnings.filterwarnings("ignore", message=r"Valid config keys have changed in V2", category=UserWarning)
warnings.filterwarnings("ignore", message=r"WARNING! response_format is not default parameter", category=UserWarning)
warnings.filterwarnings("ignore", message=r"pkg_resources is deprecated as an API.*", category=UserWarning, module=r"^munch$")

import os
import json
import re
import wikipedia # Addition
from SPARQLWrapper import SPARQLWrapper, JSON
from dotenv import load_dotenv
#from snowflake.snowpark import Session
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
from typing import Annotated, Literal, Optional, List, Dict, Any, Type
from trulens.otel.semconv.trace import SpanAttributes
from trulens.core.otel.instrument import instrument
#from snowflake.core import Root
#from snowflake.core.cortex.lite_agent_service import AgentRunRequest
from pydantic import BaseModel, PrivateAttr
from langchain_openai import ChatOpenAI
from langchain_tavily import TavilySearch
#from langchain.schema import HumanMessage
from langchain_core.messages import HumanMessage
from langgraph.graph import MessagesState, START, StateGraph, END
from langgraph.types import Command
from langgraph.prebuilt import create_react_agent
from trulens.core import Feedback, Select
from trulens.core.feedback.selector import Selector
#from trulens.core.feedback.selector import Selector
from trulens.providers.openai import OpenAI
import numpy as np
#from prompts import plan_prompt, executor_prompt, agent_system_prompt

from langgraph.managed.is_last_step import RemainingSteps

# load full dotenv
load_dotenv()

# --- HELPERS POUR SELECTION JSON (MODE SANS OTEL) ---
def select_context(output):
    return [m.content for m in output.get('messages', []) if getattr(m, 'name', '') in ['web_researcher', 'cortex_researcher']]

def select_plan_text(output):
    for m in output.get('messages', []):
        if getattr(m, 'name', '') in ['initial_plan', 'replan']: return m.content
    return ""

def select_user_query(output):
    return output.get("user_query", "")

def select_final_answer(output):
    return output.get("final_answer", "")

def select_all(data):
    return data

# Custom State class with specific keys
class State(MessagesState):
    enabled_agents: Optional[List[str]]
    # Current plan only: mapping from step number (as string) to step definition
    plan: Optional[Dict[str, Dict[str, Any]]]
    user_query: Optional[str]
    current_step: int
    replan_flag: Optional[bool]
    last_reason: Optional[str]
    # Replan attempts tracked per step number
    replan_attempts: Optional[Dict[int, int]]
    agent_query: Optional[str]
    remaining_steps: RemainingSteps

MAX_REPLANS = 2

# # Create a Snowflake session
# snowflake_connection_parameters = {
#     "account": os.getenv("SNOWFLAKE_ACCOUNT"),
#     "user": os.getenv("SNOWFLAKE_USER"),
#     "password": os.getenv("SNOWFLAKE_PAT"),
#     "database": os.getenv("SNOWFLAKE_DATABASE"),
#     "schema": os.getenv("SNOWFLAKE_SCHEMA"),
#     "role": os.getenv("SNOWFLAKE_ROLE"),
#     "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
# }

# snowpark_session = Session.builder.configs(
#     snowflake_connection_parameters
# ).create()

# create a python repl tool for importing in the lessons
repl = PythonREPL()

@tool
def python_repl_tool(
    code: Annotated[str, "The python code to execute to generate your chart."],
):
    """Use this to execute python code. You will be used to execute python code
    that generates charts. Only print the chart once.
    This is visible to the user."""
    try:
        result = repl.run(code)
    except BaseException as e:
        return f"Failed to execute. Error: {repr(e)}"
    result_str = (
        f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
    )
    return (
        result_str
        + "\n\nIf you have completed all tasks, respond with FINAL ANSWER."
    )

reasoning_llm = ChatOpenAI(
    model=os.environ["MODEL_REASONING"],
    model_kwargs={"response_format": {"type": "json_object"}},
)

@instrument(attributes=lambda ret, exception, *args, **kwargs: {
        "retrieved_plan": json.dumps(ret.update.get("plan", {})), # 1. On capture le Plan (Output)
        "retrieved_query": args[0].get("user_query") or args[0].get("messages", [HumanMessage(content="")])[0].content}) # 2. On capture la Query User (Input) depuis l'√©tat (args[0] = state) # On essaie de lire 'user_query', sinon on prend le premier message
def planner_node(state: State) \
        -> "Command[Literal['executor']]":
    """
    Runs the planning LLM and stores the resulting plan in state.
    """
    # 1. Invoke LLM with the planner prompt
    llm_reply = reasoning_llm.invoke([plan_prompt(state)])

    # 2. Validate JSON
    try:
        content_str = llm_reply.content if isinstance(llm_reply.content, str) else str(llm_reply.content)
        parsed_plan = json.loads(content_str)
    except json.JSONDecodeError:
        raise ValueError(f"Planner returned invalid JSON:\n{llm_reply.content}")

    # 3. Store as current plan only
    replan         = state.get("replan_flag", False)
    updated_plan: Dict[str, Any] = parsed_plan

    return Command(
        update={
            "plan":         updated_plan,
            "messages":     [HumanMessage(
                                content=llm_reply.content,
                                name="replan" if replan else "initial_plan"
                             )],
            "user_query":   state.get("user_query",
                                      state["messages"][0].content),
           "current_step": 1 if not replan else state["current_step"],
           # Preserve replan flag so executor runs planned agent once before reconsidering
           "replan_flag":  state.get("replan_flag", False),
           "last_reason":  "",
           "enabled_agents": state.get("enabled_agents"),
        },
        goto="executor",
    )


# ## Create executor
# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
@instrument(attributes=lambda ret, exception, *args, **kwargs: {
        "retrieved_execution": ret.update.get("messages", [HumanMessage(content="")])[-1].content}) # On capture la r√©ponse de l'executor depuis l'objet Command
def executor_node(
    state: State,
) -> Command[Literal["web_researcher", "cortex_researcher", "chart_generator", "synthesizer", "planner"]]:

    plan: Dict[str, Any] = state.get("plan", {})
    step: int = state.get("current_step", 1)

    # 0) If we *just* replanned, run the planned agent once before reconsidering.
    if state.get("replan_flag"):
        planned_agent = plan.get(str(step), {}).get("agent")
        return Command(
            update={
                "replan_flag": False,
                "current_step": step + 1,  # advance because we executed the planned agent
            },
            goto=planned_agent,
        )

    # 1) Build prompt & call LLM
    llm_reply = reasoning_llm.invoke([executor_prompt(state)])
    try:
        content_str = llm_reply.content if isinstance(llm_reply.content, str) else str(llm_reply.content)
        parsed = json.loads(content_str)
        replan: bool = parsed["replan"]
        goto: str   = parsed["goto"]
        reason: str = parsed["reason"]
        query: str  = parsed["query"]
    except Exception as exc:
        raise ValueError(f"Invalid executor JSON:\n{llm_reply.content}") from exc

    # Upodate the state
    updates: Dict[str, Any] = {
        "messages": [HumanMessage(content=llm_reply.content, name="executor")],
        "last_reason": reason,
        "agent_query": query,
    }

    # Replan accounting
    replans: Dict[int, int] = state.get("replan_attempts", {}) or {}
    step_replans = replans.get(step, 0)

    # 2) Replan decision
    if replan:
        if step_replans < MAX_REPLANS:
            replans[step] = step_replans + 1
            updates.update({
                "replan_attempts": replans,
                "replan_flag": True,     # ensure next turn executes the planned agent once
                "current_step": step,    # stay on same step for the new plan
            })
            return Command(update=updates, goto="planner")
        else:
            # Cap hit: skip this step; let next step (or synthesizer) handle termination
            next_agent = plan.get(str(step + 1), {}).get("agent", "synthesizer")
            updates["current_step"] = step + 1
            return Command(update=updates, goto=next_agent)

    # 3) Happy path: run chosen agent; advance only if following the plan
    planned_agent = plan.get(str(step), {}).get("agent")
    updates["current_step"] = step + 1 if goto == planned_agent else step
    updates["replan_flag"] = False
    return Command(update=updates, goto=goto)

# Set semantic model file (for analyst) and search service name
# SEMANTIC_MODEL_FILE = "@sales_intelligence.data.models/sales_metrics_model.yaml"
# CORTEX_SEARCH_SERVICE = "sales_intelligence.data.sales_conversation_search"

# ---- Agent Setup ----
# class CortexAgentArgs(BaseModel):
#     query: str

# class CortexAgentTool:
    # ....

    # def __init__(self, session: Session):
    # ....

    # def _consume_stream(self, stream):
    # ....

    # def run(self, query: str, **kwargs):
    # ....

# cortex_agent_tool = CortexAgentTool(session=snowpark_session)

# NEW ------------------------------------
# 1. Wikipedia Tool (Replaces Cortex Search - Unstructured)
@tool
def wikipedia_rag_tool(query: str):
    """
    Retrieves unstructured information from Wikipedia to answer general knowledge questions.
    Use this for definitions, history, summaries, or non-tabular data.
    """
    try:
        search_results = wikipedia.search(query, results=1)
        if not search_results:
            return "No relevant Wikipedia pages found."

        page = wikipedia.page(search_results[0], auto_suggest=False)
        summary = page.content[:2000]
        return f"Source: {page.title}\nURL: {page.url}\n\nContent:\n{summary}"
    except Exception as e:
        return f"Wikipedia Error: {e}"

# 2. Wikidata SPARQL Tool (Replaces Cortex Analyst - Structured)
@tool
def wikidata_sparql_tool(query: str):
    """
    Retrieves structured data (lists, counts, dates, facts) from Wikidata.
    The input must be a natural language question. The tool will generate and execute SPARQL.
    Use this when you need tables, specific data points, or relationships.
    """
    sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
    sparql.setReturnFormat(JSON)

    # Internal helper to translate Natural Language -> SPARQL
    # We use a small inline LLM call for this translation
    translator_llm = ChatOpenAI(model=os.environ["MODEL_EXECUTOR"], temperature=0)

    prompt = f"""
    Translate this question into a valid SPARQL query for Wikidata.
    Question: {query}

    Return ONLY the SPARQL code inside ```sparql ... ``` blocks.
    Ensure prefixes like wdt: and wd: are correct.
    Limit results to 10 unless specified.
    """

    try:
        response = translator_llm.invoke(prompt)
        content = response.content

        # Extract SPARQL code block
        if "```sparql" in content:
            query_code = content.split("```sparql")[1].split("```")[0].strip()
        elif "```" in content:
            query_code = content.split("```")[1].split("```")[0].strip()
        else:
            query_code = content.strip()

        # Execute
        sparql.setQuery(query_code)
        results = sparql.query().convert()

        # Parse JSON results into a string table
        bindings = results["results"]["bindings"]
        if not bindings:
            return "No results found in Wikidata."

        output_lines = []
        for item in bindings:
            row = []
            for key in item:
                row.append(f"{key}: {item[key]['value']}")
            output_lines.append(", ".join(row))

        return f"SPARQL Query Executed:\n{query_code}\n\nResults:\n" + "\n".join(output_lines)

    except Exception as e:
        return f"SPARQL Error: {e}"

from langgraph.prebuilt import create_react_agent
#from prompts import agent_system_prompt

llm = ChatOpenAI(model=os.environ["MODEL_EXECUTOR"])

_cortex_llm_with_tools = llm.bind_tools([wikipedia_rag_tool, wikidata_sparql_tool])
# cortex_agent = create_react_agent(llm, tools=[cortex_agent_tool.run], prompt=agent_system_prompt(f"""
#         You are the Researcher. You can answer questions
#         using customer deal data along with meeting notes.
#         Do not take any further action.
#     """))
# cortex_agent = create_react_agent(
#     llm,
#     tools=[wikipedia_rag_tool, wikidata_sparql_tool],
#     max_iterations=3,
#     prompt=agent_system_prompt(f"""
#         You are the Cortex Researcher replacement.
#         You have two tools:
#         1. `wikidata_sparql_tool`: For STRUCTURED questions (lists, stats, facts).
#         2. `wikipedia_rag_tool`: For UNSTRUCTURED questions (summaries, history).

#         Choose the right tool based on the user's request.
#      """))

@instrument(
    span_type=SpanAttributes.SpanType.RETRIEVAL,
    attributes=lambda ret, exception, *args, **kwargs: {
        SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0].get("agent_query") if args[0].get("agent_query") else None,
        SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
            ret.update["messages"][-1].content
        ] if hasattr(ret, "update") else "No tool call",
    },
)
def cortex_agents_research_node(
    state: State,
) -> Command[Literal["executor"]]:
    """
    Cortex researcher using simple tool-calling (NO ReAct loop = NO recursion issue).

    Flow:
    1. LLM decides which tool to call
    2. Execute that tool once
    3. Return result
    """
    query = state.get("agent_query", state.get("user_query", ""))

    # Prompt that guides tool selection
    prompt = f"""You are a research assistant. Use one of your available tools to answer this query.

Available tools:
- wikipedia_rag_tool: For general knowledge, summaries, descriptions, history
- wikidata_sparql_tool: For structured data like lists, rankings, statistics, counts

Query: {query}

Call the most appropriate tool to answer this query."""

    try:
        # Single LLM call - it will decide which tool to use
        response = _cortex_llm_with_tools.invoke([HumanMessage(content=prompt)])

        # Check if LLM made tool calls
        if hasattr(response, 'tool_calls') and response.tool_calls:
            results = []
            for tool_call in response.tool_calls:
                tool_name = tool_call.get("name", "")
                tool_args = tool_call.get("args", {})

                # Get the query argument (tools expect 'query' parameter)
                tool_query = tool_args.get("query", query)

                # Execute the tool
                try:
                    if tool_name == "wikipedia_rag_tool":
                        result = wikipedia_rag_tool.invoke({"query": tool_query})
                    elif tool_name == "wikidata_sparql_tool":
                        result = wikidata_sparql_tool.invoke({"query": tool_query})
                    else:
                        result = f"Unknown tool: {tool_name}"
                except Exception as tool_error:
                    result = f"Tool {tool_name} failed: {str(tool_error)}"

                results.append(f"=== {tool_name} ===\n{result}")

            final_content = "\n\n".join(results)
        else:
            # LLM didn't call a tool - use its direct response or fallback
            final_content = response.content if response.content else f"No tool was called. Query: {query}"

    except Exception as e:
        final_content = f"Research failed: {str(e)}"

    new_message = HumanMessage(content=final_content, name="cortex_researcher")

    return Command(
        update={"messages": [new_message]},
        goto="executor",
    )

# ## Create Web Search Agent

tavily_tool = TavilySearch(max_results=5)

llm = ChatOpenAI(model=os.environ["MODEL_EXECUTOR"])

# Research agent and node
web_search_agent = create_react_agent(
    llm,
    tools=[tavily_tool],
    prompt=agent_system_prompt(f"""
        You are the Researcher. You can ONLY perform research by using the provided search tool (tavily_tool).
        When you have found the necessary information, end your output.
        Do NOT attempt to take further actions.
    """),
)

@instrument(
    span_type=SpanAttributes.SpanType.RETRIEVAL,
    attributes=lambda ret, exception, *args, **kwargs: {
        SpanAttributes.RETRIEVAL.QUERY_TEXT: args[0].get("agent_query", ""),
        SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS: [
            ret.update["messages"][-1].content
        ] if hasattr(ret, "update") else "No tool call",
    },
)
def web_research_node(
    state: State,
) -> Command[Literal["executor"]]:
    agent_query = state.get("agent_query")
    result = web_search_agent.invoke({"messages":agent_query}, config={"recursion_limit": 5})
    messages = [HumanMessage(content=agent_query)] if isinstance(agent_query, str) else agent_query
    result = web_search_agent.invoke({"messages": messages})
    goto = "executor"
    # wrap in a human message, as not all providers allow
    # AI message at the last position of the input messages list
    result["messages"][-1] = HumanMessage(
        content=result["messages"][-1].content, name="web_researcher"
    )
    return Command(
        update={
            # share internal message history of research agent with other agents
            "messages": result["messages"],
        },
        goto=goto,
    )

# ## Create Charting Agent

# Chart generator agent and node
# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION, WHICH CAN BE UNSAFE WHEN NOT SANDBOXED
chart_agent = create_react_agent(
    llm,
    [python_repl_tool],
    prompt=agent_system_prompt(
        "You can only generate charts. You are working with a researcher colleague. Print the chart first. Then, save the chart to a file in the current working directory and provide the path to the chart_summarizer."
    ),
)

def chart_node(state: State) -> Command[Literal["chart_summarizer"]]:
    result = chart_agent.invoke(state)
    # wrap in a human message, as not all providers allow
    # AI message at the last position of the input messages list
    result["messages"][-1] = HumanMessage(
        content=result["messages"][-1].content, name="chart_generator"
    )
    goto="chart_summarizer"
    return Command(
        update={
            # share internal message history of chart agent with other agents
            "messages": result["messages"],
        },
        goto=goto,
    )


# ## Create Chart Summary Agent

chart_summary_agent = create_react_agent(
    llm,
    tools=[],  # Add image processing tools if available/needed.
    prompt=agent_system_prompt(
        "You can only summarize the chart that was generated by the chart generator to answer the user's question. You are working with a researcher colleague and a chart generator colleague. "
        + "Your task is to generate a standalone, concise summary for the provided chart image saved at a local PATH, where the PATH should be and only be provided by your chart generator colleague. The summary should be no more than 3 sentences and should not mention the chart itself."
    ),
)

def chart_summary_node(
    state: State,
) -> Command[Literal[END]]:
    result = chart_summary_agent.invoke(state)
    print(f"Chart summarizer answer: {result['messages'][-1].content}")
    # Ensure the summary message is attributed to chart_summarizer for downstream use
    result["messages"][-1] = HumanMessage(
        content=result["messages"][-1].content, name="chart_summarizer"
    )
    # Send to the end node
    goto = END
    return Command(
        update={
            # share internal message history of chart agent with other agents
            "messages": result["messages"],
            "final_answer": result["messages"][-1].content,
        },
        goto=goto,
    )


# ## Create a Synthesizer Agent
def synthesizer_node(state: State) -> Command[Literal[END]]:
    """
    Creates a concise, human‚Äëreadable summary of the entire interaction,
    **purely in prose**.

    It ignores structured tables or chart IDs and instead rewrites the
    relevant agent messages (research results, chart commentary, etc.)
    into a short final answer.
    """
    # Gather informative messages for final synthesis
    relevant_msgs = []
    for m in state.get("messages", []):
        if getattr(m, "name", None) in ("web_researcher", "cortex_researcher", "chart_generator", "chart_summarizer"):
            # FIX: Robustly handle content types and TRUNCATE huge outputs to avoid Token Limit Errors
            raw_content = m.content
            if isinstance(raw_content, list):
                # Handle multimodal content (list of dicts) by flattening to string
                text_content = " ".join([str(item) for item in raw_content])
            else:
                text_content = str(raw_content) if raw_content else ""

            # Truncate to ~15k chars per message to be safe (keeps context manageable)
            if len(text_content) > 15000:
                text_content = text_content[:15000] + "... [TRUNCATED DUE TO LENGTH]"

            relevant_msgs.append(text_content)

    # Fallback for user query extraction
    messages_list = state.get("messages", [])
    if messages_list and hasattr(messages_list[0], "content"):
        default_query = messages_list[0].content
    else:
        default_query = ""

    user_question = state.get("user_query", default_query)

    synthesis_instructions = (
            "You are the Synthesizer. Use the context below to directly answer the user's question. " # UPDATED THIS LINE
            "Perform any lightweight calculations, comparisons, or inferences required. " # ADDED THIS LINE
            "Do not invent facts not supported by the context. If data is missing, say what's missing and, if helpful, " # UPDATED THIS LINE
            "offer a clearly labeled best-effort estimate with assumptions.\n\n" # ADDED THIS LINE
            "Produce a concise response that fully answers the question, with the following guidance:\n" # UPDATED THIS LINE
            "- Start with the direct answer (one short paragraph or a tight bullet list).\n"
            "- Include key figures from any 'Results:' tables (e.g., totals, top items).\n"
            "- If any message contains citations, include them as a brief 'Citations: [...]' line.\n"
            "- Keep the output crisp; avoid meta commentary or tool instructions."
        )

    summary_prompt = [
        HumanMessage(content=(
            f"User question: {user_question}\n\n"
            f"{synthesis_instructions}\n\n"
            f"Context:\n\n" + "\n\n---\n\n".join(relevant_msgs)
        ))
    ]
    llm_reply = llm.invoke(summary_prompt)

    reply_content = llm_reply.content
    if isinstance(reply_content, list):
        reply_text = "".join([c if isinstance(c, str) else str(c) for c in reply_content])
    else:
        reply_text = str(reply_content)
    answer = reply_text.strip()
    print(f"Synthesizer answer: {answer}")

    return Command(
        update={
            "final_answer": answer,
            "messages": [HumanMessage(content=answer, name="synthesizer")],
        },
        goto=END,           # hand off to the END node
    )

##############################
# Eval RAG Triad Evaluations #
##############################
provider = OpenAI(model_engine=os.environ["MODEL_EVAL"])

# Groundedness: retrieved contexts (RETRIEVAL spans) vs final answer (main output)
f_groundedness = (Feedback(provider.groundedness_measure_with_cot_reasons, name="Groundedness")
    .on({"source": Selector(span_type=SpanAttributes.SpanType.RETRIEVAL, span_attribute=SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS, collect_list=True,)})
    .on_output())   # maps "statement" to the app main output in OTEL mode

# Question/answer relevance: main input vs main output
f_answer_relevance = (Feedback(provider.relevance_with_cot_reasons, name="Answer Relevance")
    .on_input()    # maps "prompt" (or input) from app main input
    .on_output())   # maps "response" (or output) from app main output

# Context relevance: main input vs each retrieved context chunk
f_context_relevance = (Feedback(provider.context_relevance_with_cot_reasons, name="Context Relevance")
    .on_input()
    .on({"context": Selector(span_type=SpanAttributes.SpanType.RETRIEVAL, span_attribute=SpanAttributes.RETRIEVAL.RETRIEVED_CONTEXTS, collect_list=False,)})
    .aggregate(np.mean))

######################
# Eval Goal-Plan-Act #
######################
gpa_eval_provider = OpenAI(model_engine=os.environ["MODEL_EVAL"])

f_logical_consistency = (Feedback(gpa_eval_provider.logical_consistency_with_cot_reasons, name="Logical Consistency")
    .on({"trace": Selector(trace_level=True)}))

f_execution_efficiency = (Feedback(gpa_eval_provider.execution_efficiency_with_cot_reasons, name="Execution Efficiency")
    .on({"trace": Selector(trace_level=True)}))

f_plan_adherence = (Feedback(gpa_eval_provider.relevance_with_cot_reasons, name="Plan Adherence")
    .on({
            "prompt": Selector(span_attribute="retrieved_plan"),      # On lit l'√©tiquette du Planner
            "response": Selector(span_attribute="retrieved_execution") # On lit l'√©tiquette de l'Executor
        }))

f_plan_quality = (
    Feedback(gpa_eval_provider.relevance_with_cot_reasons, name="Plan Quality")
    .on({
            # CORRECTION : On utilise le Selector sur l'attribut qu'on vient de cr√©er
            "prompt": Selector(span_attribute="retrieved_query"),

            # Le plan (inchang√©)
            "response": Selector(span_attribute="retrieved_plan")
        }))

from IPython.display import HTML, display

def display_eval_reason(text, width=800):
    # Strip any trailing "Score: X" from the end of the text
    raw_text = str(text).rstrip()
    cleaned_text = re.sub(r"\s*Score:\s*-?\d+(?:\.\d+)?\s*$", "", raw_text, flags=re.IGNORECASE)
    # Convert newlines to HTML line breaks, then wrap
    html_text = cleaned_text.replace('\n', '<br><br>')
    display(HTML(f'<div style="font-size: 15px; word-wrap: break-word; width: {width}px;">{html_text}</div>'))


In [None]:
import os
from dotenv import load_dotenv
import warnings

load_dotenv(override=True)
warnings.filterwarnings("ignore")


<div style="background-color:#fff6ff; padding:13px; border-width:3px; border-color:#efe6ef; border-style:solid; border-radius:6px">
<p> üíª &nbsp; <b>To access <code>requirements.txt</code>, <code>env.template</code>, <code>prompts.py</code>, and <code>helper.py</code> files:</b> 1) click on the <em>"File"</em> option on the top menu of the notebook 2) click on <em>"Open"</em>.

<p> ‚¨á &nbsp; <b>Download Notebooks:</b> 1) click on the <em>"File"</em> option on the top menu of the notebook and then 2) click on <em>"Download as"</em> and select <em>"Notebook (.ipynb)"</em>.</p>

</div>

## 6.1 Add inline evaluations (skipped, already set in helpers)

## 6.2 Update the planning prompt

Add pre-conditions, post-conditions, and goals to each step in the agent's plan.

Adding this explicit detail helps the executor understand the goal of each step, which improves tool calling and agent decisions.

In [None]:
#import helper
#import prompts
#from langchain.schema import HumanMessage
from langchain_core.messages import HumanMessage

RECURSION_LIMIT = 15

original_plan_prompt_fn = plan_prompt

def patched_plan_prompt(state):
    # FIX: Call the saved original function, NOT the global 'plan_prompt'
    base = original_plan_prompt_fn(state).content
    insertion = '"action": "string",\n            "pre_conditions": ["string", ...],\n            "post_conditions": ["string", ...],\n            "goal": "string",'
    base = base.replace('"action": "string",', insertion)

    current_step = state.get("current_step", 1)
    used = max(0, int(current_step) - 1)
    remaining = max(0, RECURSION_LIMIT - used)
    base += (f"\n\n<budget> Actions Budget Used: {used}, Max Budget Remaining: {remaining}.  ## IMPORTANT: Make the best use of the available resources. </budget>")

    return HumanMessage(content=base)

plan_prompt = patched_plan_prompt


## 6.3 Build the graph

In [None]:
from langgraph.graph import START, StateGraph
#from helper import State, planner_node, executor_node, chart_node, chart_summary_node, synthesizer_node, web_research_node, cortex_agents_research_node

workflow = StateGraph(State)
workflow.add_node("planner", planner_node)
workflow.add_node("executor", executor_node)
workflow.add_node("web_researcher", web_research_node)
workflow.add_node("cortex_researcher", cortex_agents_research_node)
workflow.add_node("chart_generator", chart_node)
workflow.add_node("chart_summarizer", chart_summary_node)
workflow.add_node("synthesizer", synthesizer_node)

workflow.add_edge(START, "planner")

graph = workflow.compile()

# Preconfigure recursion_limit once (avoid passing it on every invoke).
try: graph = graph.with_config({"recursion_limit": RECURSION_LIMIT})
except Exception: pass


## 6.4 Create a TruLens session for logging

In [None]:
from trulens.core.session import TruSession
from trulens.core.database.connector.default import DefaultDBConnector

# Initialize connector with SQLite database one folder back
connector = DefaultDBConnector(database_url="sqlite:///default.sqlite")

# Create TruSession with the custom connector
session = TruSession(connector=connector)

ü¶ë Initialized with db url sqlite:///default.sqlite .
üõë Secret keys may be written to the database. See the `database_redact_keys` option of `TruSession` to prevent this.


## 6.5 Register the new version of the agent

<div style="background-color:#f7fff8; padding:15px; border-width:3px; border-color:#e0f0e0; border-style:solid; border-radius:6px">
    <p>üö® &nbsp; In this notebook, you are directly provided with the results obtained during filming. This is to help eliminate waiting time, and to prevent potential rate limit errors that might occur in this learning environment (this learning environment is constrained, and the GPA evaluation metrics consume a significant number of tokens).
</div>

In [None]:
from trulens.apps.langgraph import TruGraph
from trulens.core.schema.feedback import FeedbackMode

#from helper import f_answer_relevance, f_context_relevance, f_groundedness, f_logical_consistency, f_execution_efficiency, f_plan_adherence, f_plan_quality

selected_feedbacks = [f_answer_relevance, f_context_relevance, f_groundedness, f_logical_consistency, f_execution_efficiency, f_plan_adherence, f_plan_quality]

tru_recorder = TruGraph(
    graph,
    app_name="Research Data Agent",
    app_version="L6: Inline evals + sub-goals in planning prompt",
    feedbacks=selected_feedbacks,
    feedback_mode=FeedbackMode.WITH_APP_THREAD,
    selector_check_warning=True
    # selector_nocheck=True # selector_check_warning=False, # selector_nocheck=True
)

instrumenting <class 'langgraph.graph.state.StateGraph'> for base <class 'langgraph.graph.state.StateGraph'>
instrumenting <class 'langgraph.graph.state.CompiledStateGraph'> for base <class 'langgraph.graph.state.CompiledStateGraph'>
	instrumenting invoke
	instrumenting ainvoke
	instrumenting stream
	instrumenting astream
	instrumenting astream_events
	instrumenting stream
	instrumenting astream
	instrumenting astream_events
	instrumenting invoke
	instrumenting ainvoke
	instrumenting stream
	instrumenting astream
	instrumenting stream_mode
instrumenting <class 'langgraph.graph.state.CompiledStateGraph'> for base <class 'langgraph.pregel.main.Pregel'>
	instrumenting invoke
	instrumenting ainvoke
	instrumenting stream
	instrumenting astream
	instrumenting astream_events
	instrumenting stream
	instrumenting astream
	instrumenting astream_events
	instrumenting invoke
	instrumenting ainvoke
	instrumenting stream
	instrumenting astream
	instrumenting stream_mode


In [None]:
#@title Faire un seul TracerProvider global + export Phoenix + instrumentation LangChain ---
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider as SDKTracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

tp = trace.get_tracer_provider()
if not isinstance(tp, SDKTracerProvider):
    print("‚ö†Ô∏è TracerProvider OTEL inattendu. Assurez-vous que TRULENS_OTEL_TRACING=1 et que TruGraph est initialis√© avant ce bloc.")

# Ajouter un exporter Phoenix (OTLP HTTP) AU provider global (au lieu de laisser Phoenix/TruLens se battre)
_exporter = None
try:
    from phoenix.otel import HTTPSpanExporter  # type: ignore
    _exporter = HTTPSpanExporter(endpoint=os.environ["PHOENIX_COLLECTOR_ENDPOINT"])
except Exception:
    try:
        from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter  # type: ignore
        _exporter = OTLPSpanExporter(endpoint=os.environ["PHOENIX_COLLECTOR_ENDPOINT"])
    except Exception as e:
        print(f"‚ö†Ô∏è Phoenix exporter non initialis√©: {e}")

if _exporter is not None:
    try:
        tp.add_span_processor(BatchSpanProcessor(_exporter))
    except Exception as e:
        print(f"‚ö†Ô∏è Impossible d'ajouter le span processor Phoenix: {e}")

# Instrumentation OpenInference (spans LLM/tools) branch√©e sur le provider global TruLens
try:
    LangChainInstrumentor().instrument(tracer_provider=tp)
except Exception as e:
    print(f"‚ö†Ô∏è Instrumentation LangChain d√©j√† active ou erreur: {e}")

WARNI [opentelemetry.instrumentation.instrumentor] Attempting to instrument while already instrumented


## 6.6 Re-test the agent

<div style="background-color:#f7fff8; padding:15px; border-width:3px; border-color:#e0f0e0; border-style:solid; border-radius:6px">
    <p>üö® &nbsp;<b>Run Results:</b> In this notebook, you are directly provided with the results obtained during filming. This is to help eliminate waiting time, and to prevent potential rate limit errors that might occur in this learning environment (this learning environment is constrained, and the GPA evaluation metrics consume a significant number of tokens).
</div>

**Query 1**

In [None]:
from langchain_core.messages import HumanMessage

with tru_recorder as recording:
    query = "What are the top 5 largest cities in France by population ? Chart the population value for each."
    print(f"Query: {query}")
    state = {
                "messages": [HumanMessage(content=query)],
                "user_query": query,
                "enabled_agents": ["cortex_researcher", "web_researcher", "chart_generator", "chart_summarizer", "synthesizer"],
            }
    graph.invoke(state, print_mode=["tasks","updates","debug"])

    print("--------------------------------")

Query: What are the top 5 largest cities in France by population ? Chart the population value for each.
[1m[tasks][0m {'id': '48d6764c-f377-62fb-5ba8-0b900ca7d53a', 'name': 'planner', 'input': {'messages': [HumanMessage(content='What are the top 5 largest cities in France by population ? Chart the population value for each.', additional_kwargs={}, response_metadata={}, id='8c01f243-f24e-44bc-8530-e171942df065')], 'enabled_agents': ['cortex_researcher', 'web_researcher', 'chart_generator', 'chart_summarizer', 'synthesizer'], 'user_query': 'What are the top 5 largest cities in France by population ? Chart the population value for each.', 'remaining_steps': 14}, 'triggers': ('branch:to:planner',)}
[1m[debug][0m {'step': 1, 'timestamp': '2026-01-08T07:33:04.885667+00:00', 'type': 'task', 'payload': {'id': '48d6764c-f377-62fb-5ba8-0b900ca7d53a', 'name': 'planner', 'input': {'messages': [HumanMessage(content='What are the top 5 largest cities in France by population ? Chart the populatio

ERROR [trulens.core.otel.instrument] Error setting attributes: 'NoneType' object has no attribute 'update'


RateLimitError: Error code: 429 - {'error': {'message': 'Rate limit reached for model `llama-3.1-8b-instant` in organization `org_01j6q74hnze5sb1qc9pg0xfvpk` service tier `on_demand` on tokens per minute (TPM): Limit 6000, Used 5604, Requested 751. Please try again in 3.55s. Need more tokens? Upgrade to Dev Tier today at https://console.groq.com/settings/billing', 'type': 'tokens', 'code': 'rate_limit_exceeded'}}

In [None]:
records, feedback = session.get_records_and_feedback()
if not records.empty:
    print(f"Query: {records.iloc[-1]['input']}\n")
    print(f"Output: {records.iloc[-1]['output']}\n")
else:
    print("‚ùå No records found. Check for errors in the output above.")

**Query 2**

In [None]:
with tru_recorder as recording:
    query = "Identify our pending deals, research if they may be experiencing regulatory changes, and using the meeting notes for each customer, provide a new value proposition for each given the regulatory changes."
    print(f"Query: {query}")
    state = {
                "messages": [HumanMessage(content=query)],
                "user_query": query,
                "enabled_agents": ["cortex_researcher", "web_researcher", "chart_generator", "chart_summarizer", "synthesizer"],
            }
    graph.invoke(state, print_mode=["tasks","updates","debug"])

    print("--------------------------------")

In [None]:
records, feedback = session.get_records_and_feedback()
if not records.empty:
    print(f"Query: {records.iloc[-1]['input']}\n")
    print(f"Output: {records.iloc[-1]['output']}\n")
else:
    print("‚ùå No records found. Check for errors in the output above.")

**Query 3**

In [None]:
with tru_recorder as recording:
    query = "Identify the largest laboratories studying and developping LLM, then find major topics of those companies in 2026, and find news article about top topics."
    print(f"Query: {query}")
    state = {
                "messages": [HumanMessage(content=query)],
                "user_query": query,
                "enabled_agents": ["cortex_researcher", "web_researcher", "chart_generator", "chart_summarizer", "synthesizer"],
            }
    graph.invoke(state, print_mode=["tasks","updates","debug"])

    print("--------------------------------")

In [None]:
records, feedback = session.get_records_and_feedback()
if not records.empty:
    print(f"Query: {records.iloc[-1]['input']}\n")
    print(f"Output: {records.iloc[-1]['output']}\n")
else:
    print("‚ùå No records found. Check for errors in the output above.")

## 6.7 Launch TruLens dashboard

By comparing to the previous version, we can validate the changes.

**Note:** Make sure to click on the second link (not the localhost) to open the TruLens dashboard.

In [None]:
# @title üöÄ Launch Dashboard (Force Port 8502)
!pip install -q trulens-dashboard
from google.colab import output
from trulens.core import TruSession
import time

session = TruSession()

# Stop any existing dashboards
try:
    from trulens.dashboard import stop_dashboard
    stop_dashboard(force=True)
except:
    pass

print("‚è≥ Starting Dashboard on port 8502...")
session.start_dashboard(port=8502, force=True)
time.sleep(5) # Give it time to spin up

print("‚úÖ Dashboard ready.")
#output.serve_kernel_port_as_iframe(8502, height=1000)
output.serve_kernel_port_as_window(8502)

In [None]:
# @title Alternative Analysis üìä > View Leaderboard as DataFrame
from trulens.core import TruSession
import pandas as pd

max_records = 10
session = TruSession()

# Get the leaderboard (aggregates metrics by App ID)
print("üìä Leaderboard:")
display(session.get_leaderboard())

# OPTIONAL: Get all raw records to debug specific failures
print(f"\nüìù Last {max_records} Raw Records:")
records, feedback = session.get_records_and_feedback()
if not records.empty:
    # Show relevant columns only
    cols = ['input', 'output', 'latency', 'total_cost'] + [c for c in records.columns if 'Groundedness' in c or 'Relevance' in c]
    # Filter columns that actually exist
    valid_cols = [c for c in cols if c in records.columns]
    display(records[valid_cols].tail(max_records))
else:
    print("No records found yet.")

**What other improvements could be also done?**
- In this course, we focused on evaluating the end-to-end agent behavior. We could have also tested the behavior of each specialized agent separately to optimize their prompt and design.
- We could have added other metrics for inline-evaluations.
- We could also updated the prompt of the executor.

# Ajout des modules RAG et *SQL*

# Vers l'optimisation

> TODO: v√©rifications en cours: https://chatgpt.com/c/69583b8d-16b4-8327-9cc0-3b5baff84b01



In [None]:
#@title Instalation de l'optimiseur g√©n√©ratif Trace avec un m√©canisme de log externe
!pip install "git+https://github.com/doxav/NewTrace.git@json-logs-and-traces-IO"

R√©alisez dans la cellule ci-dessous deux exemples d'optimisations avec Trace avec succ√®s depuis un exemple existant:
- https://github.com/AgentOpt/OpenTrace/tree/main/examples
- https://agentopt.github.io/OpenTrace/#code-examples (attention la doc a √©t√© g√©n√©r√©e par IA g√©n√©rative, il peut y avoir des incoh√©rences)

In [None]:
# @title Cr√©ation de trace_optimize_runtime.py (Attention le code de cette section du TP va √©voluer)
%%writefile trace_optimize_runtime.py
"""
trace_optimize_runtime.py

Pont minimal et **non-intrusif** entre :

- des ex√©cutions LangGraph instrument√©es par TruLens (au format *records JSON* TruLens et/ou spans OpenTelemetry),
- des feedbacks TruLens (RAG triad + GPA, ou toute autre m√©trique),
- et l'optimiseur de la lib Trace/OptoPrime (fichiers `JSON_OTEL_trace_optim_demo_*.py`).

Objectif : permettre une boucle "run ‚Üí trace ‚Üí feedback ‚Üí optimise ‚Üí patch" **sans modifier**
le code du graphe LangGraph (n≈ìuds/agents) d√©j√† existant.

Principes cl√©s
-------------
1) **Pr√©server le graphe causal** : on ne "aplatit" pas la trace. Les param√®tres `param.*`
   sont attach√©s aux spans qui repr√©sentent *r√©ellement* les √©tapes (planner/executor/‚Ä¶),
   et une span `evaluator` est ajout√©e uniquement pour porter `eval.*`.
2) **Compatibilit√© double** :
   - si vous avez une trace OTEL/OTLP (ex: TruLens OTEL activ√©), on l'utilise directement ;
   - sinon, on reconstruit une trace OTLP minimale depuis un *Record* TruLens (JSON standard).
3) **Optimisation de code** (pas seulement du prompt tuning) :
   on expose du code comme param√®tre trainable via `param.__code_<key>` et on applique
   les patches via compilation + hotpatch (in-place si possible, ou remplacement symbolique).

Cette impl√©mentation vise une approche g√©n√©rique :
- pas de fonctions nomm√©es "for_l6" ;
- tout est pilot√© par des *configurations* (matchers, specs, targets).

Pr√©-requis au runtime
---------------------
- TruLens : utilis√© pour capturer les records et produire les feedbacks.
- (Optionnel) OpenTelemetry : si TruLens exporte des spans OTEL, on peut les "flusher".
- Trace/Opto (repo Trace/opto) : utilis√© pour `otlp_traces_to_trace_json`, `ingest_tgj`,
  et l'optimiseur OptoPrimeV2.

Remarque : ce fichier ne d√©pend pas de LangGraph ni TruLens √† l'import.
Il se contente de manipuler des JSON/dicts, et d'appliquer des patches Python.
"""

from __future__ import annotations

import copy
import dataclasses
import datetime as _dt
import inspect
import json
import os
import random
import re
import textwrap
import time
import types
import uuid
from dataclasses import dataclass, field
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    Mapping,
    MutableMapping,
    Optional,
    Sequence,
    Tuple,
    Union,
)

# ---------------------------------------------------------------------------
# Types simples
# ---------------------------------------------------------------------------

JSONDict = Dict[str, Any]
SpanDict = Dict[str, Any]


# ---------------------------------------------------------------------------
# Utilitaires JSON / texte
# ---------------------------------------------------------------------------

def safe_json_dumps(obj: Any, *, max_len: int = 4000) -> str:
    """
    S√©rialise `obj` en JSON de mani√®re robuste (fallback str), puis tronque.

    Args:
        obj: objet √† s√©rialiser.
        max_len: longueur max en caract√®res (au-del√†, on tronque).

    Returns:
        str JSON (ou string fallback), tronqu√©e si n√©cessaire.
    """
    try:
        s = json.dumps(obj, ensure_ascii=False, default=str)
    except Exception:
        s = str(obj)
    if max_len and len(s) > max_len:
        return s[: max_len - 3] + "..."
    return s


def normalize_whitespace(s: str) -> str:
    """
    Normalise l√©g√®rement un texte (espaces, lignes vides) pour stabiliser des diffs.

    Args:
        s: texte.

    Returns:
        texte normalis√©.
    """
    s2 = s.replace("\r\n", "\n").replace("\r", "\n")
    # √©vite de d√©truire la mise en forme: on enl√®ve juste les trailing spaces
    s2 = "\n".join(line.rstrip() for line in s2.splitlines())
    return s2.strip() + ("\n" if s2.endswith("\n") else "")


# ---------------------------------------------------------------------------
# OTLP helpers (structure JSON)
# ---------------------------------------------------------------------------

def _otlp_attr_value(value: Any) -> Dict[str, Any]:
    """
    Encode une valeur Python en valeur OTLP JSON (stringValue/doubleValue/intValue/boolValue).

    Note:
        Pour rester simple et compatible, on privil√©gie stringValue.
        Les nombres sont encod√©s en doubleValue si possibles.

    Returns:
        dict au format OTLP "AnyValue".
    """
    if isinstance(value, bool):
        return {"boolValue": bool(value)}
    if isinstance(value, int) and not isinstance(value, bool):
        # OTLP accepte intValue sous forme de cha√Æne ou int selon l'impl; on met int.
        return {"intValue": int(value)}
    if isinstance(value, float):
        return {"doubleValue": float(value)}
    # fallback string
    return {"stringValue": str(value)}


def _otlp_kv(key: str, value: Any) -> Dict[str, Any]:
    """Construit un attribut OTLP (key/value)."""
    return {"key": key, "value": _otlp_attr_value(value)}


def otlp_is_payload(obj: Any) -> bool:
    """
    D√©tecte si `obj` ressemble √† un payload OTLP traces JSON.

    Args:
        obj: objet quelconque.

    Returns:
        True si la structure contient `resourceSpans`.
    """
    return isinstance(obj, dict) and "resourceSpans" in obj


def otlp_iter_spans(otlp: JSONDict) -> Iterator[SpanDict]:
    """
    It√®re sur tous les spans d'un payload OTLP.

    Args:
        otlp: payload OTLP (dict).

    Yields:
        chaque span (dict) *mutable*.
    """
    for rs in otlp.get("resourceSpans", []) or []:
        for ss in rs.get("scopeSpans", []) or []:
            for sp in ss.get("spans", []) or []:
                yield sp


def otlp_span_attrs_to_dict(span: SpanDict) -> Dict[str, Any]:
    """
    Convertit la liste `span["attributes"]` en dict {key: python_value}.

    Args:
        span: dict OTLP span.

    Returns:
        dict (valeurs simplifi√©es).
    """
    out: Dict[str, Any] = {}
    for kv in span.get("attributes", []) or []:
        k = kv.get("key")
        v = kv.get("value", {})
        if not k:
            continue
        # choisir un champ OTLP
        if "stringValue" in v:
            out[k] = v["stringValue"]
        elif "doubleValue" in v:
            out[k] = float(v["doubleValue"])
        elif "intValue" in v:
            out[k] = int(v["intValue"])
        elif "boolValue" in v:
            out[k] = bool(v["boolValue"])
        else:
            out[k] = v
    return out


def otlp_set_span_attribute(span: SpanDict, key: str, value: Any) -> None:
    """
    Ajoute ou remplace un attribut OTLP sur un span.

    Args:
        span: span OTLP mutable.
        key: cl√© d'attribut.
        value: valeur (sera encod√©e).
    """
    attrs = span.get("attributes")
    if attrs is None:
        attrs = []
        span["attributes"] = attrs

    # replace if exists
    for kv in attrs:
        if kv.get("key") == key:
            kv["value"] = _otlp_attr_value(value)
            return

    attrs.append(_otlp_kv(key, value))


def otlp_get_trace_id(otlp: JSONDict) -> Optional[str]:
    """
    Renvoie un traceId (hex) du payload OTLP si pr√©sent.

    Args:
        otlp: payload OTLP.

    Returns:
        traceId (32 hex chars) ou None.
    """
    for sp in otlp_iter_spans(otlp):
        tid = sp.get("traceId")
        if tid:
            return tid
    return None


def _new_trace_id_hex() -> str:
    """G√©n√®re un traceId OTLP (32 hex chars)."""
    return uuid.uuid4().hex  # 32 hex


def _new_span_id_hex() -> str:
    """G√©n√®re un spanId OTLP (16 hex chars)."""
    return f"{random.getrandbits(64):016x}"


def ensure_otlp_shell(
    *,
    service_name: str = "app",
    scope_name: str = "trace_opt",
) -> JSONDict:
    """
    Construit un "shell" OTLP vide compatible avec `otlp_traces_to_trace_json`.

    Args:
        service_name: nom de ressource OTEL.
        scope_name: nom du scope.

    Returns:
        dict OTLP avec `resourceSpans/scopeSpans/spans`.
    """
    return {
        "resourceSpans": [
            {
                "resource": {
                    "attributes": [
                        _otlp_kv("service.name", service_name),
                    ]
                },
                "scopeSpans": [
                    {
                        "scope": {"name": scope_name, "version": ""},
                        "spans": [],
                    }
                ],
            }
        ]
    }


def otlp_append_span(otlp: JSONDict, span: SpanDict) -> None:
    """
    Ajoute un span √† la premi√®re scopeSpan du payload.

    Args:
        otlp: payload OTLP.
        span: span dict.
    """
    rs_list = otlp.setdefault("resourceSpans", [])
    if not rs_list:
        otlp.update(ensure_otlp_shell())
        rs_list = otlp["resourceSpans"]
    rs0 = rs_list[0]
    ss_list = rs0.setdefault("scopeSpans", [])
    if not ss_list:
        ss_list.append({"scope": {"name": "trace_opt", "version": ""}, "spans": []})
    ss0 = ss_list[0]
    spans = ss0.setdefault("spans", [])
    spans.append(span)


# ---------------------------------------------------------------------------


# ---------------------------------------------------------------------------
# Capture OTEL -> OTLP (optionnel)
# ---------------------------------------------------------------------------

def try_attach_inmemory_span_exporter() -> Tuple[Optional[Any], Optional[Any], str]:
    """
    Tente d'attacher un `InMemorySpanExporter` au TracerProvider global OpenTelemetry.

    Pourquoi:
        TruLens peut exporter des spans OTEL (OpenTelemetry). Si on peut accrocher un
        exporter en m√©moire, on peut r√©cup√©rer la trace OTLP **sans** modifier le graphe.

    Returns:
        (exporter, processor, status)

        - exporter: instance InMemorySpanExporter ou None
        - processor: SimpleSpanProcessor ou None
        - status: message (ok / warning / error)
    """
    try:
        from opentelemetry import trace as otel_trace  # type: ignore
        from opentelemetry.sdk.trace.export import InMemorySpanExporter, SimpleSpanProcessor  # type: ignore
    except Exception as e:
        return None, None, f"OpenTelemetry SDK indisponible: {e}"

    provider = otel_trace.get_tracer_provider()
    if not hasattr(provider, "add_span_processor"):
        return None, None, "TracerProvider global n'a pas add_span_processor (provider non-SDK ?)"

    try:
        exporter = InMemorySpanExporter()
        processor = SimpleSpanProcessor(exporter)
        provider.add_span_processor(processor)  # type: ignore[attr-defined]
        return exporter, processor, "ok"
    except Exception as e:
        return None, None, f"Erreur lors de l'attachement de l'exporter: {e}"


def flush_inmemory_exporter_to_otlp(
    exporter: Any,
    *,
    service_name: str = "app",
    scope_name: str = "inmemory",
    clear: bool = True,
) -> JSONDict:
    """
    Convertit les spans collect√©s par `InMemorySpanExporter` en payload OTLP JSON.

    Args:
        exporter: instance InMemorySpanExporter.
        service_name: resource.service.name.
        scope_name: scopeSpans.scope.name.
        clear: si True, vider l'exporter apr√®s lecture.

    Returns:
        OTLP payload dict.
    """
    otlp = ensure_otlp_shell(service_name=service_name, scope_name=scope_name)

    spans = list(getattr(exporter, "get_finished_spans")() or [])
    if clear and hasattr(exporter, "clear"):
        exporter.clear()

    for sp in spans:
        try:
            ctx = sp.get_span_context()
            trace_id = f"{ctx.trace_id:032x}"
            span_id = f"{ctx.span_id:016x}"
        except Exception:
            # fallback (rare)
            trace_id = _new_trace_id_hex()
            span_id = _new_span_id_hex()

        parent_span_id = ""
        try:
            parent = getattr(sp, "parent", None)
            if parent is not None:
                parent_span_id = f"{parent.span_id:016x}"
        except Exception:
            parent_span_id = ""

        name = getattr(sp, "name", "span")
        start_ns = int(getattr(sp, "start_time", time.time_ns()))
        end_ns = int(getattr(sp, "end_time", start_ns + 1_000_000))

        attrs_list: List[Dict[str, Any]] = []
        attrs = getattr(sp, "attributes", {}) or {}
        if isinstance(attrs, dict):
            for k, v in attrs.items():
                # Pour rester robuste, on encode en string (Trace/otel_adapter sait parser stringValue).
                attrs_list.append(_otlp_kv(str(k), safe_json_dumps(v, max_len=8000)))

        otlp_append_span(
            otlp,
            {
                "traceId": trace_id,
                "spanId": span_id,
                "parentSpanId": parent_span_id,
                "name": str(name),
                "kind": "INTERNAL",
                "startTimeUnixNano": start_ns,
                "endTimeUnixNano": end_ns,
                "attributes": attrs_list,
            },
        )

    return otlp


# TruLens record JSON -> OTLP (fallback si pas de spans OTEL disponibles)
# ---------------------------------------------------------------------------

def trulens_is_record(obj: Any) -> bool:
    """
    Heuristique: d√©tecte si `obj` ressemble √† un Record TruLens (JSON standard).

    Un Record TruLens (voir doc) contient typiquement `record_id` et `calls`.

    Args:
        obj: objet.

    Returns:
        True si on d√©tecte des champs "record_id" ou "calls".
    """
    return isinstance(obj, dict) and ("calls" in obj or "record_id" in obj or "main_input" in obj)


def _parse_dt_to_ns(value: Any) -> Optional[int]:
    """
    Tente de parser des timestamps TruLens (perf.start_time / perf.end_time) vers ns Unix.

    Formats accept√©s (best-effort):
      - int / float : suppos√© √™tre des secondes (float) ou ns (int tr√®s grand).
      - str ISO 8601 : ex "2025-01-02T12:34:56.123Z"
      - datetime.

    Returns:
        int nanosecondes, ou None si impossible.
    """
    if value is None:
        return None

    if isinstance(value, int):
        # Heuristique: si tr√®s grand, c'est d√©j√† du ns
        if value > 10_000_000_000_000:  # > ~1970 + 4h en ns
            return value
        # sinon secondes
        return int(value * 1_000_000_000)

    if isinstance(value, float):
        return int(value * 1_000_000_000)

    if isinstance(value, _dt.datetime):
        if value.tzinfo is None:
            value = value.replace(tzinfo=_dt.timezone.utc)
        return int(value.timestamp() * 1_000_000_000)

    if isinstance(value, str):
        s = value.strip()
        # Z -> +00:00
        if s.endswith("Z"):
            s = s[:-1] + "+00:00"
        try:
            dt = _dt.datetime.fromisoformat(s)
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=_dt.timezone.utc)
            return int(dt.timestamp() * 1_000_000_000)
        except Exception:
            return None

    return None


def _trulens_call_name(call: JSONDict) -> str:
    """
    Produit un nom de span "lisible" pour un call TruLens.

    TruLens record appelle ces objets `RecordAppCall` avec un champ `stack` contenant
    des √©l√©ments `RecordAppCallMethod` incluant `path` et `method`.

    Strat√©gie:
      - si on a un `path`, on prend son dernier segment (souvent proche du nom de n≈ìud)
      - sinon, on prend `method.name`
      - sinon fallback "call"

    Returns:
        str
    """
    stack = call.get("stack") or []
    top = stack[-1] if isinstance(stack, list) and stack else {}
    method = (top.get("method") or {}) if isinstance(top, dict) else {}
    path = top.get("path") if isinstance(top, dict) else None

    # path est souvent un Lens (liste de segments)
    last_seg: Optional[str] = None
    if isinstance(path, (list, tuple)) and path:
        last = path[-1]
        if isinstance(last, str):
            last_seg = last
        else:
            last_seg = str(last)
    elif isinstance(path, str) and path:
        # ex: "nodes/planner"
        parts = re.split(r"[\\/]+", path)
        last_seg = parts[-1] if parts else path

    mname = None
    if isinstance(method, dict):
        mname = method.get("name") or method.get("method_name") or method.get("function_name")

    if last_seg and last_seg not in {"__call__", "invoke", "run"}:
        return str(last_seg)
    if mname:
        return str(mname)
    return "call"


@dataclass
class _CallSpan:
    """Structure interne pour reconstruire une hi√©rarchie approximative."""
    call_idx: int
    name: str
    call_id: str
    start_ns: Optional[int]
    end_ns: Optional[int]
    stack_sig: Tuple[str, ...]
    args: Any = None
    rets: Any = None
    error: Optional[str] = None
    parent_idx: Optional[int] = None
    span_id: str = field(default_factory=_new_span_id_hex)


def _call_stack_signature(call: JSONDict) -> Tuple[str, ...]:
    """
    Construit une signature (tuple) √† partir de `call.stack` pour aider √† inf√©rer la hi√©rarchie.

    Returns:
        tuple de strings.
    """
    sig: List[str] = []
    stack = call.get("stack") or []
    if not isinstance(stack, list):
        return tuple()

    for frame in stack:
        if not isinstance(frame, dict):
            continue
        path = frame.get("path")
        method = frame.get("method") or {}
        # path normalis√©
        if isinstance(path, (list, tuple)):
            p = "/".join(str(x) for x in path)
        else:
            p = str(path) if path is not None else ""
        m = ""
        if isinstance(method, dict):
            m = str(method.get("name") or method.get("method_name") or method.get("function_name") or "")
        sig.append(f"{p}::{m}".strip(":"))
    return tuple(sig)


def trulens_record_to_otlp(
    record: JSONDict,
    *,
    service_name: str = "trulens",
    scope_name: str = "trulens_record",
    trace_id: Optional[str] = None,
    include_root_span: bool = True,
    max_io_chars: int = 4000,
) -> JSONDict:
    """
    Convertit un Record TruLens (JSON) en payload OTLP minimal.

    Cette conversion est un *fallback* quand vous n'avez pas de spans OTEL disponibles.
    Elle reconstruit une hi√©rarchie de spans √† partir des `perf` timestamps (si pr√©sents),
    sinon √† partir de signatures de stack (heuristique).

    Args:
        record: dict JSON TruLens (record).
        service_name: service.name OTEL.
        scope_name: scope OTEL.
        trace_id: si fourni, utilis√© comme traceId.
        include_root_span: ajoute un span racine "record" (recommand√©).
        max_io_chars: taille max pour input.value / output.value.

    Returns:
        OTLP payload dict.
    """
    if not trulens_is_record(record):
        raise ValueError("L'objet fourni ne ressemble pas √† un Record TruLens JSON.")

    trace_id = trace_id or _new_trace_id_hex()
    otlp = ensure_otlp_shell(service_name=service_name, scope_name=scope_name)

    calls = record.get("calls") or []
    if not isinstance(calls, list):
        calls = []

    spans: List[_CallSpan] = []
    for idx, call in enumerate(calls):
        if not isinstance(call, dict):
            continue
        call_id = str(call.get("call_id") or call.get("callId") or f"call-{idx}")
        name = _trulens_call_name(call)
        perf = call.get("perf") or {}
        start_ns = _parse_dt_to_ns(perf.get("start_time") or perf.get("startTime") or perf.get("start"))
        end_ns = _parse_dt_to_ns(perf.get("end_time") or perf.get("endTime") or perf.get("end"))
        stack_sig = _call_stack_signature(call)

        spans.append(
            _CallSpan(
                call_idx=idx,
                name=name,
                call_id=call_id,
                start_ns=start_ns,
                end_ns=end_ns,
                stack_sig=stack_sig,
                args=call.get("args"),
                rets=call.get("rets"),
                error=call.get("error"),
            )
        )

    # Heuristique de hi√©rarchie:
    # 1) si des timestamps existent pour la majorit√©, on utilise l'inclusion d'intervalles
    # 2) sinon, on utilise la relation "stack prefix" la plus r√©cente
    have_times = sum(1 for s in spans if s.start_ns is not None and s.end_ns is not None)
    use_interval = have_times >= max(2, int(0.6 * len(spans))) if spans else False

    if use_interval:
        # Ordre: start asc, end desc (pour bien g√©rer les enveloppes)
        spans_sorted = sorted(
            spans,
            key=lambda s: (
                s.start_ns if s.start_ns is not None else 0,
                -(s.end_ns if s.end_ns is not None else 0),
            ),
        )
        stack: List[_CallSpan] = []
        for s in spans_sorted:
            s_start = s.start_ns if s.start_ns is not None else 0
            # pop les spans qui se terminent avant le start courant
            while stack and (stack[-1].end_ns is not None) and s_start >= (stack[-1].end_ns or 0):
                stack.pop()
            if stack:
                s.parent_idx = stack[-1].call_idx
            stack.append(s)
        # spans_sorted contient des objets de la m√™me liste => parent_idx est appliqu√©
    else:
        # stack prefix: on mappe signature -> dernier idx
        last_by_sig: Dict[Tuple[str, ...], int] = {}
        for s in spans:
            parent_sig = s.stack_sig[:-1] if s.stack_sig else tuple()
            if parent_sig in last_by_sig:
                s.parent_idx = last_by_sig[parent_sig]
            # enregistrer ce call comme dernier pour sa signature
            last_by_sig[s.stack_sig] = s.call_idx

    # Root span optionnel
    root_span_id = _new_span_id_hex()
    root_end = max((s.end_ns or 0) for s in spans) if spans else time.time_ns()
    root_start = min((s.start_ns or root_end) for s in spans) if spans else root_end - 1_000_000

    if include_root_span:
        root_span: SpanDict = {
            "traceId": trace_id,
            "spanId": root_span_id,
            "parentSpanId": "",
            "name": "record",
            "kind": "INTERNAL",
            "startTimeUnixNano": int(root_start),
            "endTimeUnixNano": int(root_end),
            "attributes": [
                _otlp_kv("trulens.record_id", record.get("record_id") or record.get("recordId") or ""),
                _otlp_kv("input.value", safe_json_dumps(record.get("main_input"), max_len=max_io_chars)),
                _otlp_kv("output.value", safe_json_dumps(record.get("main_output"), max_len=max_io_chars)),
            ],
        }
        otlp_append_span(otlp, root_span)

    # Convert calls to spans
    now_ns = time.time_ns()
    for s in spans:
        start = s.start_ns or (now_ns + s.call_idx * 1_000_000)
        end = s.end_ns or (start + 500_000)

        parent_span_id = ""
        if s.parent_idx is not None:
            # retrouver le parent span_id
            parent = next((p for p in spans if p.call_idx == s.parent_idx), None)
            if parent is not None:
                parent_span_id = parent.span_id
        elif include_root_span:
            parent_span_id = root_span_id

        span: SpanDict = {
            "traceId": trace_id,
            "spanId": s.span_id,
            "parentSpanId": parent_span_id,
            "name": s.name,
            "kind": "INTERNAL",
            "startTimeUnixNano": int(start),
            "endTimeUnixNano": int(end),
            "attributes": [
                _otlp_kv("trulens.call_id", s.call_id),
                _otlp_kv("input.value", safe_json_dumps(s.args, max_len=max_io_chars)),
                _otlp_kv("output.value", safe_json_dumps(s.rets, max_len=max_io_chars)),
            ],
        }
        if s.error:
            span["attributes"].append(_otlp_kv("error.value", s.error))
        otlp_append_span(otlp, span)

    return otlp


# ---------------------------------------------------------------------------
# S√©lection de spans / injection de param√®tres
# ---------------------------------------------------------------------------

@dataclass(frozen=True)
class SpanMatcher:
    """
    S√©lecteur de spans (OTLP) bas√© sur des heuristiques simples.

    Vous pouvez matcher par :
    - substring(s) sur le nom (`name_contains`)
    - regex(s) sur le nom (`name_regex`)
    - pr√©sence de certaines cl√©s d'attributs (`has_attrs`)
    - substring(s) sur la valeur d'un attribut (`attr_contains`)

    C'est volontairement simple pour rester g√©n√©rique et portable.
    """
    name_contains: Tuple[str, ...] = ()
    name_regex: Tuple[str, ...] = ()
    has_attrs: Tuple[str, ...] = ()
    attr_contains: Mapping[str, Tuple[str, ...]] = dataclasses.field(default_factory=dict)

    def matches(self, span: SpanDict) -> bool:
        """Retourne True si `span` satisfait ce matcher."""
        name = str(span.get("name") or "")
        lname = name.lower()

        if self.name_contains:
            if not any(sub.lower() in lname for sub in self.name_contains):
                return False

        if self.name_regex:
            ok = False
            for pat in self.name_regex:
                try:
                    if re.search(pat, name):
                        ok = True
                        break
                except re.error:
                    continue
            if not ok:
                return False

        if self.has_attrs or self.attr_contains:
            attrs = otlp_span_attrs_to_dict(span)
            if self.has_attrs:
                if not all(k in attrs for k in self.has_attrs):
                    return False
            for k, subs in self.attr_contains.items():
                v = str(attrs.get(k, ""))
                lv = v.lower()
                if not any(s.lower() in lv for s in subs):
                    return False

        return True


def select_spans(otlp: JSONDict, matcher: SpanMatcher) -> List[SpanDict]:
    """
    Retourne la liste des spans matching `matcher`.

    Args:
        otlp: payload OTLP.
        matcher: SpanMatcher.

    Returns:
        liste de spans (dicts mutables).
    """
    return [sp for sp in otlp_iter_spans(otlp) if matcher.matches(sp)]


def select_one_span(otlp: JSONDict, matcher: SpanMatcher) -> Optional[SpanDict]:
    """
    Retourne le premier span matching (ou None).

    Astuce:
        Pratique pour choisir le parent d'un span evaluator, etc.

    Returns:
        span dict ou None.
    """
    for sp in otlp_iter_spans(otlp):
        if matcher.matches(sp):
            return sp
    return None


# ---------------------------------------------------------------------------
# Sp√©cifications de param√®tres entra√Ænables
# ---------------------------------------------------------------------------

@dataclass
class ParamSpec:
    """
    D√©crit un param√®tre √† :
      1) exposer dans la trace (OTLP) via `param.<name>`
      2) √©ventuellement appliquer au runtime lors d'une update.

    Attributes:
        name: nom logique (ex: "planner_addendum" ou "__code_planner_node").
        get_value: fonction 0-arg retournant la valeur courante (str conseill√©).
        apply_update: fonction (new_value:str) -> None, appliquant une update.
        attach_to: SpanMatcher indiquant sur quel(s) span(s) √©crire l'attribut param.*.
        trainable: si False, l'optimiseur ne doit pas toucher ce param.
        description: description courte inject√©e c√¥t√© optimiseur (conseill√©e pour code).
        normalize: optionnel, transforme la valeur avant injection (ex: normaliser espaces).
    """
    name: str
    get_value: Callable[[], Any]
    apply_update: Optional[Callable[[str], None]] = None
    attach_to: Optional[SpanMatcher] = None
    trainable: bool = True
    description: str = ""
    normalize: Optional[Callable[[str], str]] = normalize_whitespace

    def value_as_str(self) -> str:
        """Renvoie la valeur courante en string (avec normalisation si configur√©e)."""
        v = self.get_value()
        s = v if isinstance(v, str) else safe_json_dumps(v, max_len=8000)
        if self.normalize:
            try:
                s = self.normalize(s)
            except Exception:
                pass
        return s



# ---------------------------------------------------------------------------
# Prompt tuning g√©n√©rique (addendum non-intrusif)
# ---------------------------------------------------------------------------

class TextOverrideStore:
    """
    Store simple (en m√©moire) pour des overrides textuels.

    Usage typique:
        store = TextOverrideStore()
        store.set("planner_addendum", "...")

    On peut l'utiliser avec `wrap_prompt_builder_with_addendum` pour modifier
    une fonction qui retourne un prompt (str ou BaseMessage LangChain).
    """

    def __init__(self):
        self._values: Dict[str, str] = {}

    def get(self, key: str, default: str = "") -> str:
        return str(self._values.get(key, default))

    def set(self, key: str, value: str) -> None:
        self._values[str(key)] = str(value)

    def as_param_spec(
        self,
        *,
        name: str,
        attach_to: Optional[SpanMatcher],
        trainable: bool = True,
        description: str = "",
    ) -> ParamSpec:
        """
        Construit un ParamSpec "texte" connect√© √† ce store.

        Args:
            name: nom du param (cl√© dans le store).
            attach_to: o√π accrocher dans la trace.
            trainable: bool.
            description: aide l'optimiseur.

        Returns:
            ParamSpec.
        """
        return ParamSpec(
            name=name,
            get_value=lambda: self.get(name, ""),
            apply_update=lambda v: self.set(name, v),
            attach_to=attach_to,
            trainable=trainable,
            description=description,
            normalize=normalize_whitespace,
        )


def _clone_langchain_message_with_content(msg: Any, new_content: str) -> Any:
    """
    Clone un message LangChain (BaseMessage) en rempla√ßant `content`, best-effort.

    On √©vite d'importer LangChain √† l'import du module; l'import est fait ici si possible.

    Args:
        msg: objet message.
        new_content: contenu final.

    Returns:
        nouveau message (ou fallback str si impossible).
    """
    # cas simple: string
    if isinstance(msg, str):
        return new_content

    # Tentatives LangChain (pydantic/dataclass)
    try:
        from langchain_core.messages import BaseMessage  # type: ignore
        if isinstance(msg, BaseMessage):
            # Pydantic v2
            if hasattr(msg, "model_copy"):
                return msg.model_copy(update={"content": new_content})
            # Pydantic v1
            if hasattr(msg, "copy"):
                try:
                    return msg.copy(update={"content": new_content})
                except TypeError:
                    return msg.copy()
    except Exception:
        pass

    # Generic: tenter reconstruction via __class__(**fields)
    try:
        if hasattr(msg, "model_dump"):
            d = msg.model_dump()
        elif hasattr(msg, "dict"):
            d = msg.dict()
        elif dataclasses.is_dataclass(msg):
            d = dataclasses.asdict(msg)
        else:
            d = dict(getattr(msg, "__dict__", {}))
        d["content"] = new_content
        cls = msg.__class__
        return cls(**d)
    except Exception:
        # fallback string
        return new_content


def wrap_prompt_builder_with_addendum(
    prompt_fn: Callable[..., Any],
    *,
    store: TextOverrideStore,
    addendum_key: str,
    header: str = "\n\n# Addendum\n",
) -> Callable[..., Any]:
    """
    Wrap une fonction de prompt pour lui ajouter un "addendum" contr√¥l√© par `store`.

    - Si `store.get(addendum_key)` est vide ‚Üí comportement inchang√©.
    - Sinon ‚Üí on concat√®ne `original_content + header + addendum`.

    Compatibilit√©:
        - si la fonction renvoie un `str`, on renvoie un `str`
        - si elle renvoie un message LangChain, on renvoie un message du m√™me type (best-effort)

    Args:
        prompt_fn: fonction originale (ex: prompts.plan_prompt).
        store: TextOverrideStore.
        addendum_key: nom du param d'override.
        header: s√©parateur ajout√© avant l'addendum.

    Returns:
        fonction wrapper.
    """
    def _wrapped(*args, **kwargs):
        out = prompt_fn(*args, **kwargs)
        add = store.get(addendum_key, "").strip()
        if not add:
            return out

        # Extraire le contenu initial
        if isinstance(out, str):
            base = out
        else:
            base = str(getattr(out, "content", out))

        new_content = base + header + add
        return _clone_langchain_message_with_content(out, new_content)

    # garder un minimum de metadata
    try:
        _wrapped.__name__ = getattr(prompt_fn, "__name__", "prompt_wrapper")
        _wrapped.__doc__ = getattr(prompt_fn, "__doc__", None)
    except Exception:
        pass

    return _wrapped


def inject_params_into_otlp(
    otlp: JSONDict,
    param_specs: Sequence[ParamSpec],
    *,
    default_attach_to: Optional[SpanMatcher] = None,
) -> JSONDict:
    """
    Ajoute des attributs `param.<name>` aux spans OTLP, selon les ParamSpec.

    Args:
        otlp: payload OTLP (sera copi√©).
        param_specs: liste des ParamSpec √† exposer.
        default_attach_to: matcher fallback si ParamSpec.attach_to est None.

    Returns:
        copie modifi√©e du payload OTLP.
    """
    otlp2 = copy.deepcopy(otlp)
    for spec in param_specs:
        matcher = spec.attach_to or default_attach_to
        if matcher is None:
            # pas d'endroit o√π accrocher => skip
            continue
        val = spec.value_as_str()
        for sp in select_spans(otlp2, matcher):
            otlp_set_span_attribute(sp, f"param.{spec.name}", val)
            otlp_set_span_attribute(sp, f"param.{spec.name}.trainable", "true" if spec.trainable else "false")
    return otlp2


def add_evaluator_span(
    otlp: JSONDict,
    *,
    score: float,
    metrics: Mapping[str, float],
    reasons: str = "",
    parent_matcher: Optional[SpanMatcher] = None,
    evaluator_span_name: str = "evaluator",
) -> JSONDict:
    """
    Ajoute un span OTLP `evaluator` portant `eval.*` (score, m√©triques, raisons).

    Important:
        On n'attache PAS les `param.*` sur ce span (sauf si vous le d√©cidez),
        pour √©viter l'optimisation bo√Æte noire.

    Args:
        otlp: payload OTLP (copi√©).
        score: score global (0..1).
        metrics: dict m√©triques (0..1).
        reasons: texte explicatif.
        parent_matcher: o√π accrocher l'evaluator (typiquement le span "synthesizer").
        evaluator_span_name: nom de span.

    Returns:
        payload OTLP modifi√©.
    """
    otlp2 = copy.deepcopy(otlp)
    trace_id = otlp_get_trace_id(otlp2) or _new_trace_id_hex()

    # choisir le parent span id
    parent_span_id = ""
    if parent_matcher is not None:
        parent = select_one_span(otlp2, parent_matcher)
        if parent is not None:
            parent_span_id = str(parent.get("spanId") or "")

    # fallback: dernier span par endTimeUnixNano
    if not parent_span_id:
        spans = list(otlp_iter_spans(otlp2))
        if spans:
            spans_sorted = sorted(spans, key=lambda s: int(s.get("endTimeUnixNano") or 0))
            parent_span_id = str(spans_sorted[-1].get("spanId") or "")

    now_ns = time.time_ns()
    span: SpanDict = {
        "traceId": trace_id,
        "spanId": _new_span_id_hex(),
        "parentSpanId": parent_span_id,
        "name": evaluator_span_name,
        "kind": "INTERNAL",
        "startTimeUnixNano": int(now_ns),
        "endTimeUnixNano": int(now_ns + 500_000),
        "attributes": [
            _otlp_kv("eval.score", str(float(score))),
            _otlp_kv("eval.reasons", reasons or ""),
        ],
    }
    for k, v in metrics.items():
        span["attributes"].append(_otlp_kv(f"eval.{k}", str(float(v))))
    # Optionnel: input/output.value pour donner de la "mati√®re" au graphe
    span["attributes"].append(_otlp_kv("input.value", "TruLens feedback"))
    span["attributes"].append(_otlp_kv("output.value", reasons or ""))

    otlp_append_span(otlp2, span)
    return otlp2


def coerce_to_otlp(
    trace_or_record: Any,
    *,
    service_name: str = "app",
    scope_name: str = "trace_opt",
) -> JSONDict:
    """
    Convertit une entr√©e "trace-like" en OTLP.

    Supporte:
      - payload OTLP natif (dict avec `resourceSpans`)
      - Record TruLens JSON (dict avec `calls` / `record_id`) -> OTLP minimal

    Args:
        trace_or_record: OTLP ou Record TruLens.
        service_name: utilis√© si conversion TruLens -> OTLP.
        scope_name: utilis√© si conversion TruLens -> OTLP.

    Returns:
        payload OTLP.
    """
    if otlp_is_payload(trace_or_record):
        return trace_or_record  # type: ignore[return-value]
    if trulens_is_record(trace_or_record):
        return trulens_record_to_otlp(trace_or_record, service_name=service_name, scope_name=scope_name)  # type: ignore[arg-type]
    raise ValueError("Entr√©e non reconnue: attendu OTLP ou Record TruLens JSON.")


def param_descriptions_from_specs(param_specs: Sequence[ParamSpec]) -> Dict[str, str]:
    """
    Construit un mapping semantic_name -> description √† partir des ParamSpec.

    Astuce:
        `semantic_name` correspond au nom de param tel qu'il appara√Æt dans Trace
        (sans pr√©fixe runX:).

    Args:
        param_specs: specs.

    Returns:
        dict.
    """
    out: Dict[str, str] = {}
    for s in param_specs:
        if s.description:
            out[s.name] = s.description
    return out


def prepare_otlp_for_optimizer(
    trace_or_record: Any,
    *,
    param_specs: Sequence[ParamSpec],
    score: float,
    metrics: Mapping[str, float],
    reasons: str = "",
    default_param_attach_to: Optional[SpanMatcher] = None,
    evaluator_parent_matcher: Optional[SpanMatcher] = None,
    service_name: str = "app",
    scope_name: str = "trace_opt",
) -> JSONDict:
    """
    Pipeline "one-shot" : (trace|record) -> OTLP -> inject params -> add evaluator.

    Args:
        trace_or_record: OTLP ou Record TruLens.
        param_specs: param√®tres trainables √† exposer.
        score: score global.
        metrics: dict m√©triques.
        reasons: texte explicatif.
        default_param_attach_to: fallback pour ParamSpec.attach_to.
        evaluator_parent_matcher: span parent pour l'evaluator.
        service_name: service.name si conversion TruLens -> OTLP.
        scope_name: scope.name si conversion TruLens -> OTLP.

    Returns:
        payload OTLP pr√™t √† √™tre ing√©r√© dans Trace.
    """
    otlp0 = coerce_to_otlp(trace_or_record, service_name=service_name, scope_name=scope_name)
    otlp1 = inject_params_into_otlp(otlp0, param_specs, default_attach_to=default_param_attach_to)
    otlp2 = add_evaluator_span(
        otlp1,
        score=score,
        metrics=metrics,
        reasons=reasons,
        parent_matcher=evaluator_parent_matcher,
    )
    return otlp2


# ---------------------------------------------------------------------------
# Extraction m√©triques TruLens (depuis DataFrame row OU JSON)
# ---------------------------------------------------------------------------

def extract_metrics_from_mapping(
    obj: Mapping[str, Any],
    *,
    metric_keys: Sequence[str],
    default_metric: float = 0.5,
) -> Dict[str, float]:
    """
    Extrait des m√©triques depuis un mapping (dict-like) via des cl√©s.

    Args:
        obj: mapping (ex: row.to_dict()).
        metric_keys: noms de colonnes / champs.
        default_metric: fallback si manquant.

    Returns:
        dict m√©trique -> float.
    """
    out: Dict[str, float] = {}
    for k in metric_keys:
        val = obj.get(k, default_metric)
        try:
            out[k] = float(val)
        except Exception:
            out[k] = float(default_metric)
    return out


def compute_score(
    metrics: Mapping[str, float],
    *,
    weights: Optional[Mapping[str, float]] = None,
    clamp_0_1: bool = True,
) -> float:
    """
    Calcule un score scalaire √† partir d'un dict de m√©triques.

    Args:
        metrics: dict m√©trique -> float.
        weights: dict m√©trique -> poids (sinon moyenne uniforme).
        clamp_0_1: clamp le r√©sultat entre [0, 1].

    Returns:
        float score.
    """
    if not metrics:
        return 0.5
    if weights:
        num = 0.0
        den = 0.0
        for k, v in metrics.items():
            w = float(weights.get(k, 0.0))
            num += w * float(v)
            den += w
        score = num / den if den > 0 else sum(float(v) for v in metrics.values()) / len(metrics)
    else:
        score = sum(float(v) for v in metrics.values()) / len(metrics)
    if clamp_0_1:
        score = max(0.0, min(1.0, score))
    return score



def select_latest_item(container: Any) -> Any:
    """
    S√©lectionne "le dernier √©l√©ment" d'un container.

    Supporte:
      - pandas.DataFrame / pandas.Series via `.iloc[-1]`
      - list/tuple via `[-1]`
      - dict: renvoie tel quel (consid√©r√© d√©j√† comme 1 record)

    Args:
        container: objet.

    Returns:
        dernier √©l√©ment ou l'objet lui-m√™me (dict).

    Raises:
        ValueError si vide/incompatible.
    """
    if container is None:
        raise ValueError("container is None")

    if isinstance(container, dict):
        return container

    # pandas DataFrame/Series
    if hasattr(container, "iloc"):
        try:
            if getattr(container, "shape", (0,))[0] == 0:
                raise ValueError("container is empty")
            return container.iloc[-1]
        except Exception:
            pass

    if isinstance(container, (list, tuple)):
        if not container:
            raise ValueError("container is empty")
        return container[-1]

    raise ValueError(f"Type non support√© pour select_latest_item: {type(container)}")


def extract_mapping(obj: Any) -> Mapping[str, Any]:
    """
    Convertit best-effort un objet en mapping (dict-like).

    Supporte:
      - dict: renvoie tel quel
      - pandas.Series: `.to_dict()`
      - objets avec `model_dump()` (pydantic v2) ou `dict()` (pydantic v1)

    Args:
        obj: objet.

    Returns:
        mapping.
    """
    if isinstance(obj, dict):
        return obj

    if hasattr(obj, "to_dict"):
        try:
            return obj.to_dict()
        except Exception:
            pass

    if hasattr(obj, "model_dump"):
        try:
            return obj.model_dump()
        except Exception:
            pass

    if hasattr(obj, "dict"):
        try:
            return obj.dict()
        except Exception:
            pass

    # fallback
    return {"value": obj}


def extract_trulens_record_json(obj: Any) -> Optional[JSONDict]:
    """
    Extrait un Record TruLens JSON depuis diff√©rents conteneurs.

    Cas g√©r√©s:
      - si `obj` est d√©j√† un record dict (trulens_is_record) -> renvoie obj
      - si `obj` est une row (Series/dict) contenant un champ `record_json` ou `record`
        (dict ou JSON str) -> parse et renvoie.
      - sinon None

    Args:
        obj: record-like.

    Returns:
        dict record ou None.
    """
    if obj is None:
        return None

    if isinstance(obj, dict) and trulens_is_record(obj):
        return obj

    m = extract_mapping(obj)

    for key in ("record_json", "record", "record_jsons", "record_json_str"):
        if key in m:
            raw = m.get(key)
            if isinstance(raw, dict) and trulens_is_record(raw):
                return raw
            if isinstance(raw, str):
                try:
                    parsed = json.loads(raw)
                    if isinstance(parsed, dict) and trulens_is_record(parsed):
                        return parsed
                except Exception:
                    pass

    # parfois le record est stock√© sous une cl√© "calls" + "record_id" etc.
    if isinstance(m, dict) and trulens_is_record(m):
        return dict(m)

    return None


def render_feedback_text(
    *,
    score: float,
    metrics: Mapping[str, float],
    reasons: str = "",
    extra: Optional[Mapping[str, Any]] = None,
) -> str:
    """
    Rend un texte de feedback (√† passer √† OptoPrime) √† partir du score/m√©triques.

    Args:
        score: score global.
        metrics: dict m√©triques.
        reasons: texte explicatif (si dispo).
        extra: infos additionnelles (ex: query, output, etc).

    Returns:
        str.
    """
    parts = [f"score={score:.3f}"]
    if metrics:
        parts.append("metrics=" + ", ".join(f"{k}={v:.3f}" for k, v in metrics.items()))
    if reasons:
        parts.append("reasons=" + reasons.strip())
    if extra:
        for k, v in extra.items():
            parts.append(f"{k}={safe_json_dumps(v, max_len=600)}")
    return "\n".join(parts)


# ---------------------------------------------------------------------------
# Code targets / patching (optimisation de code)
# ---------------------------------------------------------------------------

@dataclass
class CodeTarget:
    """
    Cible de patching pour l'optimisation de code.

    Un CodeTarget est associ√© √† un param√®tre trainable :
        param.__code_<key>

    Attributes:
        key: identifiant stable (ex: "planner_node").
        get_callable: fonction retournant l'objet callable courant √† patcher.
        set_callable: optionnel, pour remplacer le symbole (module.attr = new_fn).
        attach_to: SpanMatcher o√π accrocher le param√®tre code dans la trace.
        trainable: bool.
        description: aide l'optimiseur (ex: signature / r√¥le).
    """
    key: str
    get_callable: Callable[[], Callable[..., Any]]
    set_callable: Optional[Callable[[Callable[..., Any]], None]] = None
    attach_to: Optional[SpanMatcher] = None
    trainable: bool = True
    description: str = ""

    @property
    def param_name(self) -> str:
        """Nom du param√®tre expos√© dans la trace."""
        return f"__code_{self.key}"

    def get_source(self) -> str:
        """
        Extrait le code source de la fonction cible via inspect.getsource.

        Returns:
            str code python.
        """
        fn = self.get_callable()
        try:
            return inspect.getsource(fn)
        except OSError:
            # ex: fonctions d√©finies dans un notebook sans source dispo
            return f"# Source indisponible pour {getattr(fn, '__name__', self.key)}\n"

    def infer_description(self) -> str:
        """
        D√©duit une description courte si `description` n'est pas fourni.

        Returns:
            str.
        """
        if self.description:
            return self.description
        fn = self.get_callable()
        try:
            sig = str(inspect.signature(fn))
        except Exception:
            sig = "(...)"
        return f"{getattr(fn, '__name__', self.key)}{sig}"


def hotpatch_function_in_place(target_fn: Callable[..., Any], new_fn: Callable[..., Any]) -> None:
    """
    Hotpatch "in-place" : remplace le bytecode (`__code__`) de `target_fn` par celui de `new_fn`.

    Avantage:
        Si LangGraph a captur√© une *r√©f√©rence* vers `target_fn`, le patch est effectif
        sans recompiler le graphe.

    Limites:
        Ne marche pas si la fonction utilise des closures incompatibles.

    Args:
        target_fn: fonction originale (objet) utilis√©e par le graphe.
        new_fn: fonction compil√©e √† partir d'un nouveau source.

    Raises:
        TypeError si pas patchable.
    """
    if not (isinstance(target_fn, types.FunctionType) and isinstance(new_fn, types.FunctionType)):
        raise TypeError("hotpatch_function_in_place ne supporte que des functions Python.")
    target_fn.__code__ = new_fn.__code__
    target_fn.__defaults__ = new_fn.__defaults__
    target_fn.__kwdefaults__ = new_fn.__kwdefaults__
    target_fn.__annotations__ = getattr(new_fn, "__annotations__", {})
    target_fn.__doc__ = getattr(new_fn, "__doc__", None)


def compile_function_from_source(source: str, fn_name: str, *, glb: Optional[Dict[str, Any]] = None) -> Callable[..., Any]:
    """
    Compile un source python contenant une d√©finition `def <fn_name>(...)` et renvoie cette fonction.

    Args:
        source: code python (doit d√©finir fn_name).
        fn_name: nom de la fonction √† extraire.
        glb: globals √† utiliser (permet d'acc√©der aux imports existants).

    Returns:
        function object.

    Raises:
        ValueError si la fonction n'existe pas apr√®s exec.
    """
    glb = glb or {}
    loc: Dict[str, Any] = {}
    compiled = compile(source, "<optimized>", "exec")
    exec(compiled, glb, loc)
    fn = loc.get(fn_name) or glb.get(fn_name)
    if not callable(fn):
        raise ValueError(f"Le source ne d√©finit pas la fonction attendue: {fn_name}")
    return fn  # type: ignore


def apply_code_update(
    *,
    update_source: str,
    target: CodeTarget,
    patch_mode: str = "in_place_or_replace",
    global_ns: Optional[Dict[str, Any]] = None,
) -> None:
    """
    Applique un patch de code produit par l'optimiseur √† une cible.

    Modes:
      - "in_place": hotpatch sur l'objet callable actuel uniquement.
      - "replace": remplace le symbole via target.set_callable (ou error si absent).
      - "in_place_or_replace": tente in_place, sinon fallback replace.
      - "replace_and_in_place": fait replace puis hotpatch (utile si le graphe a captur√© l'ancien objet).

    Args:
        update_source: code python complet (def ...).
        target: CodeTarget.
        patch_mode: strat√©gie.
        global_ns: dict globals pour exec (souvent globals()).

    Raises:
        Exception si impossible.
    """
    fn0 = target.get_callable()
    fn_name = getattr(fn0, "__name__", None) or target.key
    global_ns = global_ns or {}

    new_fn = compile_function_from_source(update_source, fn_name, glb=global_ns)

    if patch_mode == "in_place":
        hotpatch_function_in_place(fn0, new_fn)
        return

    if patch_mode == "replace":
        if target.set_callable is None:
            raise ValueError(f"target.set_callable manquant pour {target.key}")
        target.set_callable(new_fn)
        return

    if patch_mode == "replace_and_in_place":
        if target.set_callable is None:
            raise ValueError(f"target.set_callable manquant pour {target.key}")
        target.set_callable(new_fn)
        # tenter hotpatch sur l'ancien objet
        try:
            hotpatch_function_in_place(fn0, new_fn)
        except Exception:
            pass
        return

    # in_place_or_replace
    try:
        hotpatch_function_in_place(fn0, new_fn)
        return
    except Exception:
        if target.set_callable is None:
            raise
        target.set_callable(new_fn)
        return


def build_code_param_specs(code_targets: Sequence[CodeTarget]) -> List[ParamSpec]:
    """
    Convertit des CodeTarget en ParamSpec (pour injection OTLP + updates).

    Args:
        code_targets: cibles de code.

    Returns:
        liste ParamSpec.
    """
    specs: List[ParamSpec] = []
    for t in code_targets:
        # closure pour get_source
        def _make_getter(tt: CodeTarget) -> Callable[[], Any]:
            return lambda: tt.get_source()

        def _make_applier(tt: CodeTarget) -> Callable[[str], None]:
            return lambda src: apply_code_update(update_source=src, target=tt, patch_mode="in_place_or_replace", global_ns=globals())

        specs.append(
            ParamSpec(
                name=t.param_name,
                get_value=_make_getter(t),
                apply_update=_make_applier(t),
                attach_to=t.attach_to,
                trainable=t.trainable,
                description=t.infer_description(),
                normalize=normalize_whitespace,
            )
        )
    return specs


# ---------------------------------------------------------------------------
# Optimisation Trace/OptoPrime (√† partir d'OTLP)
# ---------------------------------------------------------------------------

# Petits wrappers autour des imports Trace pour rester optionnels
def _require_trace_imports():
    """
    Importe dynamiquement les composants Trace n√©cessaires.

    Raises:
        ImportError si la lib Trace/opto n'est pas install√©e/disponible.
    """
    from opto.trace.io.otel_adapter import otlp_traces_to_trace_json  # type: ignore
    from opto.trace.io.tgj_ingest import ingest_tgj  # type: ignore
    from opto.trace.nodes import MessageNode, ParameterNode  # type: ignore
    from opto.optimizers import OptoPrimeV2  # type: ignore
    from opto.optimizers.utils import OptimizerPromptSymbolSetJSON  # type: ignore
    from opto.trainer.algorithms.basic_algorithms import batchify  # type: ignore

    return otlp_traces_to_trace_json, ingest_tgj, MessageNode, ParameterNode, OptoPrimeV2, OptimizerPromptSymbolSetJSON, batchify


def find_target(nodes: Dict[str, Any], *, prefer_name_contains: str = "evaluator") -> Optional[Any]:
    """
    Trouve le n≈ìud cible (MessageNode) √† optimiser.

    Heuristique:
      - si un MessageNode contient `prefer_name_contains` dans son nom ‚Üí on le prend
      - sinon, on prend le "dernier" MessageNode rencontr√©.

    Args:
        nodes: dict name->node (r√©sultat ingest_tgj).
        prefer_name_contains: substring.

    Returns:
        MessageNode ou None.
    """
    _, _, MessageNode, _, _, _, _ = _require_trace_imports()
    last = None
    for n in nodes.values():
        if isinstance(n, MessageNode):
            last = n
            if prefer_name_contains.lower() in (n.name or "").lower():
                return n
    return last


def visualize_graph(nodes: Dict[str, Any]) -> str:
    """
    Visualise un graphe Trace (param√®tres + messages) sous forme texte.

    Args:
        nodes: dict name->node.

    Returns:
        str multi-ligne.
    """
    _, _, MessageNode, ParameterNode, _, _, _ = _require_trace_imports()
    params = []
    messages = []
    for name, node in nodes.items():
        if isinstance(node, ParameterNode):
            data = getattr(node, "data", "")
            data_s = data[:80] + ("..." if isinstance(data, str) and len(data) > 80 else "")
            params.append(f"[PARAM] {node.name}: {data_s!r}")
        elif isinstance(node, MessageNode):
            parents = getattr(node, "parents", []) or []
            parent_names = [getattr(p, "name", "?") for p in parents]
            messages.append(f"[MSG] {node.name} ‚Üê {parent_names if parent_names else 'ROOT'}")
    return "\n".join(params + messages)


def check_reachability(target: Any, params: List[Any]) -> Dict[str, bool]:
    """
    V√©rifie si chaque param√®tre est atteignable depuis `target` via les parents.

    Utile pour d√©tecter un param√®tre accroch√© √† un span "isol√©" (non causalement reli√©).

    Args:
        target: MessageNode cible.
        params: liste de ParameterNode.

    Returns:
        dict param.name -> bool.
    """
    _, _, _, ParameterNode, _, _, _ = _require_trace_imports()
    seen = set()
    stack = [target]
    reachable = set()
    while stack:
        node = stack.pop()
        if node in seen:
            continue
        seen.add(node)
        if hasattr(node, "parents"):
            for p in getattr(node, "parents") or []:
                if p not in seen:
                    stack.append(p)
        if isinstance(node, ParameterNode):
            reachable.add(node.name)
    return {p.name: p.name in reachable for p in params}


def _remap_params_in_graph(node: Any, param_mapping: Dict[int, Any], visited=None) -> None:
    """
    Remappe r√©cursivement des ParameterNode dans un graphe Trace.

    Lorsqu'on r√©utilise un optimiseur entre it√©rations, on veut que les graphs
    utilisent *les m√™mes objets ParameterNode* (ceux de l'optimiseur), sinon
    l'optimiseur consid√®re des params diff√©rents.

    Args:
        node: n≈ìud courant.
        param_mapping: dict id(old_param) -> optimizer_param.
        visited: set d'ids d√©j√† visit√©s.
    """
    if visited is None:
        visited = set()

    node_id = id(node)
    if node_id in visited:
        return
    visited.add(node_id)

    # Remap inputs
    if hasattr(node, "_inputs") and isinstance(getattr(node, "_inputs"), dict):
        inputs = getattr(node, "_inputs")
        for key, input_node in list(inputs.items()):
            in_id = id(input_node)
            if in_id in param_mapping:
                inputs[key] = param_mapping[in_id]
            else:
                _remap_params_in_graph(input_node, param_mapping, visited)

    # Remap parents list
    if hasattr(node, "parents") and isinstance(getattr(node, "parents"), list):
        parents = getattr(node, "parents")
        for i, parent in enumerate(list(parents)):
            p_id = id(parent)
            if p_id in param_mapping:
                parents[i] = param_mapping[p_id]
            else:
                _remap_params_in_graph(parent, param_mapping, visited)


def show_prompt_diff(before: str, after: str, *, context_lines: int = 2) -> str:
    """
    Produit un diff textuel compact pour des prompts (ou code).

    Args:
        before: texte original.
        after: texte modifi√©.
        context_lines: lignes de contexte.

    Returns:
        diff str.
    """
    import difflib
    before_lines = normalize_whitespace(before).splitlines(True)
    after_lines = normalize_whitespace(after).splitlines(True)
    diff = difflib.unified_diff(before_lines, after_lines, fromfile="before", tofile="after", n=context_lines)
    return "".join(diff)


def compute_change_stats(before: str, after: str) -> Dict[str, Any]:
    """
    Calcule des statistiques simples sur un changement (longueur, delta, etc).

    Args:
        before: texte original.
        after: texte modifi√©.

    Returns:
        dict stats.
    """
    b = before or ""
    a = after or ""
    return {
        "len_before": len(b),
        "len_after": len(a),
        "delta": len(a) - len(b),
        "delta_pct": ((len(a) - len(b)) / len(b) * 100.0) if len(b) else None,
        "lines_before": b.count("\n") + 1 if b else 0,
        "lines_after": a.count("\n") + 1 if a else 0,
    }


def _ensure_param_descriptions_on_optimizer(optimizer: Any, params: Sequence[Any], desc_by_name: Mapping[str, str]) -> None:
    """
    Ajoute/compl√®te les descriptions de param√®tres c√¥t√© optimiseur (si le champ existe).

    Args:
        optimizer: OptoPrimeV2.
        params: ParameterNode (de l'optimiseur).
        desc_by_name: mapping param_name -> description.
    """
    # OptoPrime garde des params avec attributs name/data/desc (selon versions).
    for p in getattr(optimizer, "parameters", []) or []:
        full_name = getattr(p, "name", "")
        semantic_name = full_name.split(":")[0].split("/")[-1]
        if semantic_name in desc_by_name:
            if not getattr(p, "desc", ""):
                try:
                    p.desc = desc_by_name[semantic_name]
                except Exception:
                    pass


@dataclass
class RunResult:
    """
    R√©sultat d'un run √† optimiser.

    Attributes:
        otlp: payload OTLP.
        score: score global.
        metrics: dict m√©triques.
        feedback: texte de feedback (utilis√© par l'optimiseur).
        meta: infos additionnelles (query, output, etc).
    """
    otlp: JSONDict
    score: float
    metrics: Dict[str, float]
    feedback: str
    meta: Dict[str, Any] = field(default_factory=dict)



def optimize_iteration(
    runs: Sequence[RunResult],
    *,
    optimizer: Optional[Any],
    llm_client: Any,
    objective: str,
    param_name_substrings: Sequence[str] = ("__code_",),
    memory_size: int = 12,
    verbose_graph: bool = False,
    param_descriptions: Optional[Mapping[str, str]] = None,
    prefer_target_name_contains: str = "evaluator",
) -> Tuple[Dict[str, str], Any]:
    """
    Ex√©cute une it√©ration OptoPrime sur un batch de runs.

    Points importants (par rapport aux d√©mos) :
      - compatible multi-runs (plusieurs requ√™tes) via batchify
      - **remap** des ParameterNode quand on r√©utilise un optimiseur entre it√©rations,
        afin que les nouveaux graphs pointent vers les *m√™mes objets* param√®tres
      - filtre simple sur les param√®tres √† optimiser via `param_name_substrings`

    Args:
        runs: liste de runs (id√©alement plusieurs requ√™tes pour un signal plus robuste).
        optimizer: OptoPrimeV2 existant (ou None au 1er tour).
        llm_client: client LLM pour l'optimiseur (comme dans les d√©mos).
        objective: instruction globale ("maximize eval.score ...").
        param_name_substrings: filtre sur le champ `ParameterNode.name`.
        memory_size: m√©moire de l'optimiseur.
        verbose_graph: si True, imprime une visualisation texte des graphs.
        param_descriptions: mapping semantic_name -> description (optionnel).
        prefer_target_name_contains: substring pour choisir la cible (default "evaluator").

    Returns:
        (updates, optimizer)
        updates: dict semantic_param_name -> new_value
    """
    (
        otlp_traces_to_trace_json,
        ingest_tgj,
        MessageNode,
        ParameterNode,
        OptoPrimeV2,
        OptimizerPromptSymbolSetJSON,
        batchify,
    ) = _require_trace_imports()

    # Mapping semantic_name -> optimizer ParameterNode (si optimizer d√©j√† cr√©√©)
    opt_params_by_semantic: Dict[str, Any] = {}
    if optimizer is not None:
        for p in getattr(optimizer, "parameters", []) or []:
            full = getattr(p, "name", "") or ""
            semantic = full.split(":")[0].split("/")[-1]
            opt_params_by_semantic[semantic] = p

    all_targets: List[Any] = []
    all_feedbacks: List[str] = []
    iter_params_by_semantic: Dict[str, Any] = {}

    for i, run in enumerate(runs):
        tgj_docs = list(
            otlp_traces_to_trace_json(
                run.otlp,
                agent_id_hint=f"run{i}",
                use_temporal_hierarchy=True,
            )
        )
        if not tgj_docs:
            continue
        nodes = ingest_tgj(tgj_docs[0])

        target = find_target(nodes, prefer_name_contains=prefer_target_name_contains)
        if target is None:
            continue

        # Param√®tres trainables filtr√©s
        params_in_graph: List[Any] = []
        for n in nodes.values():
            if isinstance(n, ParameterNode) and getattr(n, "trainable", False):
                nname = getattr(n, "name", "") or ""
                if any(sub in nname for sub in param_name_substrings):
                    params_in_graph.append(n)

        # Remap vers optimizer params si possible
        id_mapping: Dict[int, Any] = {}
        new_params_to_add: List[Any] = []
        for p in params_in_graph:
            full = getattr(p, "name", "") or ""
            semantic = full.split(":")[0].split("/")[-1]
            if semantic in opt_params_by_semantic:
                id_mapping[id(p)] = opt_params_by_semantic[semantic]
                iter_params_by_semantic.setdefault(semantic, opt_params_by_semantic[semantic])
            else:
                # nouveau param√®tre jamais vu
                iter_params_by_semantic.setdefault(semantic, p)
                new_params_to_add.append(p)

        if id_mapping:
            _remap_params_in_graph(target, id_mapping)

        # si optimizer existe, on lui ajoute les nouveaux param√®tres
        if optimizer is not None:
            for p in new_params_to_add:
                optimizer.parameters.append(p)  # type: ignore[attr-defined]
                full = getattr(p, "name", "") or ""
                semantic = full.split(":")[0].split("/")[-1]
                opt_params_by_semantic[semantic] = p

        if verbose_graph:
            print("\n--- Graph (run", i, ") ---")
            print(visualize_graph(nodes))

        # Reachability diagnostic (apr√®s remap)
        # On v√©rifie l'atteignabilit√© des param√®tres *utilis√©s* dans ce graph.
        params_for_reach = list(iter_params_by_semantic.values())
        reach = check_reachability(target, params_for_reach)
        unreachable = [pname for pname, ok in reach.items() if not ok]
        if unreachable:
            print(f"‚ö†Ô∏è Params non atteignables depuis target: {unreachable[:6]}{'...' if len(unreachable)>6 else ''}")

        all_targets.append(target)
        all_feedbacks.append(run.feedback)

    if not all_targets:
        return {}, optimizer

    # Cr√©er l'optimiseur au 1er tour
    if optimizer is None:
        optimizer = OptoPrimeV2(
            iter_params_by_semantic.values(),
            llm=llm_client,
            memory_size=memory_size,
            log=True,
            optimizer_prompt_symbol_set=OptimizerPromptSymbolSetJSON(),
            objective=objective,
        )
        # initialiser mapping pour la suite
        opt_params_by_semantic = {
            (p.name.split(":")[0].split("/")[-1]): p for p in getattr(optimizer, "parameters", []) or []
        }

    # Ajouter des descriptions si fournies
    if param_descriptions:
        _ensure_param_descriptions_on_optimizer(optimizer, list(iter_params_by_semantic.values()), param_descriptions)

    # Batchify et optimiser
    batched_target = batchify(*all_targets).data
    batched_feedback = batchify(*all_feedbacks).data

    optimizer.zero_feedback()
    optimizer.backward(batched_target, batched_feedback)
    optimizer.step(verbose=False)

    updates: Dict[str, str] = {}
    for p in getattr(optimizer, "parameters", []) or []:
        full_name = getattr(p, "name", "") or ""
        semantic_name = full_name.split(":")[0].split("/")[-1]
        updates[semantic_name] = getattr(p, "data", "")

    return updates, optimizer



def apply_updates(
    updates: Mapping[str, str],
    *,
    param_specs: Sequence[ParamSpec],
) -> Dict[str, str]:
    """
    Applique un dict d'updates (sortie OptoPrime) sur les ParamSpec.

    Les ParamSpec dont `apply_update` est None sont ignor√©s.

    Args:
        updates: mapping semantic_name -> new_value.
        param_specs: specs connus.

    Returns:
        dict "appliqu√©" : semantic_name -> "ok"/"skipped"/"error:..."
    """
    specs_by_name = {s.name: s for s in param_specs}
    out: Dict[str, str] = {}

    for semantic, new_val in updates.items():
        spec = specs_by_name.get(semantic)
        if spec is None:
            out[semantic] = "skipped: unknown_param"
            continue
        if spec.apply_update is None:
            out[semantic] = "skipped: no_apply_update"
            continue
        try:
            spec.apply_update(str(new_val))
            out[semantic] = "ok"
        except Exception as e:
            out[semantic] = f"error: {type(e).__name__}: {e}"
    return out

In [None]:
# --- Trace/OptoPrime optimisation (non-intrusive) ---
# Pr√©requis (d√©j√† faits dans le notebook) :
#   - `graph` : LangGraph compil√©
#   - `tru_recorder` : TruGraph (TruLens) qui wrap le graph
#   - `session` : TruSession (ou adaptez ci-dessous)
#   - `thread_config` : config LangGraph (ou adaptez)
#
# Et ajoutez le fichier `trace_optimize_runtime.py` √† c√¥t√© du notebook
# (ou mettez-le dans votre PYTHONPATH).

import sys
from pathlib import Path

# 1) Assurez-vous de pouvoir importer le runtime
if str(Path('.').resolve()) not in sys.path:
    sys.path.append(str(Path('.').resolve()))

import trace_optimize_runtime as tor

# 2) (Optionnel) attacher un exporter OTEL en m√©moire (si TruLens OTEL est actif)
exporter, processor, status = tor.try_attach_inmemory_span_exporter()
print('OTEL in-memory exporter status:', status)

# 3) Prompt addendums (tuning non intrusif) + wrappers
store = tor.TextOverrideStore()

# NOTE: dans L6, ces fonctions viennent souvent de `helper.py`.
# Adaptez ces imports si besoin.

plan_prompt = tor.wrap_prompt_builder_with_addendum(
    plan_prompt, store=store, addendum_key='planner_addendum'
)
executor_prompt = tor.wrap_prompt_builder_with_addendum(
    executor_prompt, store=store, addendum_key='executor_addendum'
)

planner_addendum = store.as_param_spec(
    name='planner_addendum',
    attach_to=tor.SpanMatcher(name_contains=('planner',)),
    trainable=True,
    description='Append-only instructions added to the planner prompt.'
)
executor_addendum = store.as_param_spec(
    name='executor_addendum',
    attach_to=tor.SpanMatcher(name_contains=('executor',)),
    trainable=True,
    description='Append-only instructions added to the executor prompt.'
)

# 4) Code targets (optimisation de code)
# IMPORTANT: key doit √™tre stable; ici on utilise les noms de fonctions.
# Si vos fonctions sont dans le notebook (pas de source inspectable), l'optimisation de code sera limit√©e.

CODE_TARGETS = []
try:
    #from helper import planner_node, executor_node, synthesizer_node
    CODE_TARGETS += [
        tor.CodeTarget(
            key='planner_node',
            get_callable=lambda: planner_node,
            attach_to=tor.SpanMatcher(name_contains=('planner',)),
            description='LangGraph node that produces/updates the plan JSON.'
        ),
        tor.CodeTarget(
            key='executor_node',
            get_callable=lambda: executor_node,
            attach_to=tor.SpanMatcher(name_contains=('executor',)),
            description='LangGraph node that executes one plan step.'
        ),
        tor.CodeTarget(
            key='synthesizer_node',
            get_callable=lambda: synthesizer_node,
            attach_to=tor.SpanMatcher(name_contains=('synthesizer',)),
            description='Final synthesis / answer node.'
        ),
    ]
except Exception as e:
    print('Could not import code targets from helper:', e)

code_param_specs = tor.build_code_param_specs(CODE_TARGETS)

PARAM_SPECS = [planner_addendum, executor_addendum] + code_param_specs
PARAM_DESC = tor.param_descriptions_from_specs(PARAM_SPECS)

# 5) M√©triques TruLens (adapter si vos colonnes diff√®rent)
METRIC_KEYS = [
    'Groundedness',
    'Answer Relevance',
    'Context Relevance',
    'Logical Consistency',
    'Execution Efficiency',
    'Plan Adherence',
    'Plan Quality',
]
METRIC_WEIGHTS = {k: 1.0 for k in METRIC_KEYS}

# 6) Objectif OptoPrime
OBJECTIVE = """You are optimizing a multi-agent LangGraph workflow.

Goal:
- Maximize eval.score (0..1), which aggregates eval.<metrics>.

Constraints:
- Keep function signatures unchanged.
- Prefer minimal diffs.
- Do not remove safety constraints.
- If you edit code, keep it readable and deterministic.
"""

# 7) Petit helper pour ex√©cuter une requ√™te et collecter RunResult
def run_query_collect(query: str):
    # Clear exporter to isolate this run (OTEL path)
    if exporter is not None and hasattr(exporter, 'clear'):
        exporter.clear()

    # Run LangGraph under TruLens recorder
    with tru_recorder as recording:
        out = graph.invoke({'messages': [('user', query)]}, config=thread_config)

    # Get OTLP from OTEL exporter if available
    otlp = None
    if exporter is not None:
        otlp = tor.flush_inmemory_exporter_to_otlp(
            exporter,
            service_name='l6',
            scope_name='trulens_otel',
            clear=True
        )

    # Fetch latest TruLens record + feedback
    try:
        recs, fbs = session.get_records_and_feedback(app_ids=[tru_recorder.app_id])
    except Exception:
        recs, fbs = session.get_records_and_feedback()

    row = tor.select_latest_item(recs)
    row_map = tor.extract_mapping(row)

    # Fallback: if no OTEL spans, build OTLP from TruLens record JSON
    if (otlp is None) or (len(list(tor.otlp_iter_spans(otlp))) == 0):
        record_json = tor.extract_trulens_record_json(row)
        if record_json is None:
            raise RuntimeError('No OTEL spans and no record_json found. Cannot build trace.')
        otlp = tor.trulens_record_to_otlp(record_json, service_name='l6', scope_name='trulens_record')

    # Compute metrics + score
    metrics = tor.extract_metrics_from_mapping(row_map, metric_keys=METRIC_KEYS, default_metric=0.5)
    score = tor.compute_score(metrics, weights=METRIC_WEIGHTS)

    # Best-effort reasons extraction
    reasons = ''
    for k in METRIC_KEYS:
        for rk in (f'{k}_reasons', f'{k}.reasons', f'{k}_reason', f'{k}.reason'):
            if rk in row_map and row_map[rk]:
                reasons += f"\n[{k}] {row_map[rk]}"

    feedback = tor.render_feedback_text(
        score=score,
        metrics=metrics,
        reasons=reasons,
        extra={'query': query}
    )

    otlp_ready = tor.prepare_otlp_for_optimizer(
        otlp,
        param_specs=PARAM_SPECS,
        score=score,
        metrics=metrics,
        reasons=reasons,
        evaluator_parent_matcher=tor.SpanMatcher(name_contains=('synthesizer',)),
        service_name='l6',
        scope_name='trace_opt',
    )

    return tor.RunResult(
        otlp=otlp_ready,
        score=score,
        metrics=metrics,
        feedback=feedback,
        meta={'query': query}
    )

# 8) Boucle d'optimisation
QUERIES = [
    'Give me a plan and then answer: Compare France vs Germany GDP growth since 2010.',
    'What are the key drivers of inflation in 2024-2025? Give citations.',
]
N_ITER = 2
optimizer = None

from opto.utils.llm import LLM
LLM_CLIENT = LLM()

for it in range(N_ITER):
    runs = [run_query_collect(q) for q in QUERIES]
    print(f'\n=== Iteration {it} ===')
    print('Scores:', [round(r.score, 3) for r in runs])

    updates, optimizer = tor.optimize_iteration(
        runs,
        optimizer=optimizer,
        llm_client=LLM_CLIENT,
        objective=OBJECTIVE,
        param_name_substrings=('__code_', 'planner_addendum', 'executor_addendum'),
        memory_size=12,
        verbose_graph=False,
        param_descriptions=PARAM_DESC,
        prefer_target_name_contains='evaluator',
    )

    applied = tor.apply_updates(updates, param_specs=PARAM_SPECS)
    print('Applied:', {k:v for k,v in applied.items() if v != 'skipped: unknown_param'})

print('\nFinal addendums:')
print('planner_addendum:\n', store.get('planner_addendum'))
print('executor_addendum:\n', store.get('executor_addendum'))