# 1. Install AG2 + Materials Project dependencies

In [1]:
!pip install -U "ag2[openai]" autogen -q

import sys
!{sys.executable} -m pip install pymatgen mp-api numpy cython ipywidgets jupyterlab_widgets -q

print("✓ AG2/autogen + Pymatgen + MP-API installed.")

✓ AG2/autogen + Pymatgen + MP-API installed.


# 2. Imports + LLM Configuration


In [4]:
import os
import json
import random
from datetime import datetime, timedelta
from typing import Annotated, Any, Literal, Union, List, Optional, Tuple

from autogen import ConversableAgent
from autogen.agentchat import ReplyResult
from autogen.agentchat.group import (
    ContextVariables,
    AgentTarget, AgentNameTarget, StayTarget,
    OnCondition, StringLLMCondition,
    OnContextCondition, ExpressionContextCondition, ContextExpression,
    RevertToUserTarget, TerminateTarget
)
from autogen.agentchat.groupchat import GroupChat, GroupChatManager
from autogen.tools import tool

from mp_api.client import MPRester

from autogen.coding.local_commandline_code_executor import LocalCommandLineCodeExecutor
from autogen.coding.base import CodeBlock
from pydantic import BaseModel, Field

from autogen.agentchat.group.patterns import DefaultPattern
from autogen.agentchat import initiate_group_chat

llm_config = {
    "model": "gpt-4o",
    "api_key": os.environ["OPENAI_API_KEY"],
}

print("✓ Imports loaded and LLM configured.")


✓ Imports loaded and LLM configured.


# 3. User query + Context Variables

In [22]:
from autogen.agentchat.group import ContextVariables

context_variables = ContextVariables(
    data={
        # Task lifecycle
        "task_started": False,
        "query": None,

        # Task refinement
        "main_task_analysis": None,
        "main_task_improvements": None,
        "main_task_improved": None,

        # Planning
        "plan": None,
        "review_status": None,
        "review_notes": None,

        # Retrieval
        "search_criteria": None,
        "fields": None,
        "sample_number": None,
        "mp_results": None,

        # Analysis
        "final_conclusion": None,

        # Coding outputs
        "last_executed_code": None,
        "last_execution_output": None,
        "last_executed_file": None,

        # Routing
        "next_agent": None,
    }
)

context_variables["agent_registry"] = [
    "AgentTaskImprover",
    "AgentPlanner",
    "AgentPlanReviewer",
    "AgentMaterialsRetriever",
    "AgentAnalyzer",
    "AgentCoder",
    "Human",
]

print("✓ ContextVariables initialized and aligned with AG2 pipeline.")



✓ ContextVariables initialized and aligned with AG2 pipeline.


# 4. TOOLS — TaskImprover → Planner → PlanReviewer → MaterialsRetriever → Analyzer → Coder


In [35]:
# Tool — task_improver

def task_improver_tool(
    analysis: Annotated[str, "Short analysis of the task consistency, missing info, redundancy."],
    improvements: Annotated[str, "Concrete improvements to make the task complete and non-redundant."],
    improved_task: Annotated[str, "Improved version of the main task."],
    context_variables: ContextVariables,
) -> ReplyResult:
    agent_name = "AgentTaskImprover"

    try:
        if not isinstance(analysis, str) or not analysis.strip():
            raise ValueError("analysis must be a non-empty string.")
        if not isinstance(improvements, str) or not improvements.strip():
            raise ValueError("improvements must be a non-empty string.")
        if not isinstance(improved_task, str) or not improved_task.strip():
            raise ValueError("improved_task must be a non-empty string.")
    except ValueError as e:
        context_variables["next_agent"] = agent_name
        return ReplyResult(
            message=f"Error in task_improver_tool: {e}",
            target=AgentNameTarget(agent_name),
            context_variables=context_variables,
        )

    analysis_s = analysis.strip()
    improvements_s = improvements.strip()
    improved_task_s = improved_task.strip()

    context_variables["task_started"] = True
    context_variables["main_task_analysis"] = analysis_s
    context_variables["main_task_improvements"] = improvements_s
    context_variables["main_task_improved"] = improved_task_s
    context_variables["next_agent"] = "AgentPlanner"

    message = {
        "main_task_analysis": analysis_s,
        "main_task_improvements": improvements_s,
        "main_task_improved": improved_task_s,
    }

    return ReplyResult(
        message=json.dumps(message, indent=2, ensure_ascii=False),
        target=AgentNameTarget("AgentPlanner"),
        context_variables=context_variables,
    )

# Tool — planner

def planner_tool(
    plan: Annotated[dict, "Structured plan dict generated by the Planner agent."],
    plan_description: Annotated[str, "Short description of the plan."],
    success_criteria: Annotated[List[str], "Concrete criteria to consider the run successful."],
    plan_score: Annotated[float, "Planner self-score for the plan quality (0-10)."],
    context_variables: ContextVariables,
) -> ReplyResult:
    """Store the plan and related metadata in context_variables and route to the next agent."""

    agent_name = "AgentPlanner"
    main_task_improved = context_variables.get("main_task_improved", None)

    try:
        if not isinstance(main_task_improved, str) or not main_task_improved.strip():
            raise ValueError("context_variables['main_task_improved'] must be a non-empty string.")
        if not isinstance(plan, dict) or not plan:
            raise ValueError("plan must be a non-empty dict.")
        if not isinstance(plan_description, str) or not plan_description.strip():
            raise ValueError("plan_description must be a non-empty string.")
        if not isinstance(success_criteria, list) or not success_criteria or not all(
            isinstance(x, str) and x.strip() for x in success_criteria
        ):
            raise ValueError("success_criteria must be a non-empty list of non-empty strings.")
        if not isinstance(plan_score, (int, float)):
            raise ValueError("plan_score must be a number.")
        plan_score_f = float(plan_score)
        if plan_score_f < 0 or plan_score_f > 10:
            raise ValueError("plan_score must be between 0 and 10.")
    except ValueError as e:
        context_variables["next_agent"] = agent_name
        return ReplyResult(
            message=f"Error in planner_tool: {e}",
            target=AgentNameTarget(agent_name),
            context_variables=context_variables,
        )

    main_task_s = main_task_improved.strip()

    try:
        if not isinstance(plan.get("schema_version"), str) or not plan["schema_version"].strip():
            raise ValueError("plan.schema_version must be a non-empty string.")

        if not isinstance(plan.get("main_task_improved"), str) or not plan["main_task_improved"].strip():
            raise ValueError("plan.main_task_improved must be a non-empty string.")
        if plan["main_task_improved"].strip() != main_task_s:
            raise ValueError("plan.main_task_improved must match context_variables['main_task_improved'].")

        routing = plan.get("routing")
        if not isinstance(routing, dict):
            raise ValueError("plan.routing must be a dict.")
        for k in ("after_planner", "on_approve", "on_revise", "on_handoff"):
            if not isinstance(routing.get(k), str) or not routing[k].strip():
                raise ValueError(f"plan.routing.{k} must be a non-empty string.")

        steps = plan.get("steps")
        if not isinstance(steps, list) or len(steps) == 0:
            raise ValueError("plan.steps must be a non-empty list.")
        if not all(isinstance(s, dict) for s in steps):
            raise ValueError("plan.steps must contain dict items only.")
        if not all(isinstance(s.get("id"), str) and s["id"].strip() for s in steps):
            raise ValueError("Each step must have a non-empty string 'id'.")

        guardrails = plan.get("guardrails", {})
        required_ids = []
        if isinstance(guardrails, dict):
            required_ids = guardrails.get("required_step_ids", [])

        if required_ids:
            if not isinstance(required_ids, list) or not all(isinstance(x, str) and x.strip() for x in required_ids):
                raise ValueError("plan.guardrails.required_step_ids must be a list of non-empty strings.")
            got_ids = {s.get("id") for s in steps if isinstance(s, dict)}
            missing = sorted(list(set(required_ids) - set(got_ids)))
            if missing:
                raise ValueError(f"Missing required step(s): {missing}")

        for s in steps:
            if "agent" in s and not (isinstance(s["agent"], str) and s["agent"].strip()):
                raise ValueError(f"Step '{s.get('id')}' has invalid 'agent' (must be non-empty string).")
            if "inputs" in s and not (
                isinstance(s["inputs"], list) and all(isinstance(x, str) and x.strip() for x in s["inputs"])
            ):
                raise ValueError(f"Step '{s.get('id')}' has invalid 'inputs' (must be list[str]).")
            if "outputs" in s and not (
                isinstance(s["outputs"], list) and all(isinstance(x, str) and x.strip() for x in s["outputs"])
            ):
                raise ValueError(f"Step '{s.get('id')}' has invalid 'outputs' (must be list[str]).")
            if "constraints" in s and not isinstance(s["constraints"], dict):
                raise ValueError(f"Step '{s.get('id')}' has invalid 'constraints' (must be dict).")

        retrieval = plan.get("retrieval")
        if not isinstance(retrieval, dict):
            raise ValueError("plan.retrieval must be a dict.")
        if not isinstance(retrieval.get("search_criteria"), dict):
            raise ValueError("plan.retrieval.search_criteria must be a dict.")
        if not isinstance(retrieval.get("fields"), list) or not retrieval["fields"] or not all(
            isinstance(x, str) and x.strip() for x in retrieval["fields"]
        ):
            raise ValueError("plan.retrieval.fields must be a non-empty list of non-empty strings.")
        if not isinstance(retrieval.get("sample_number"), int) or retrieval["sample_number"] <= 0:
            raise ValueError("plan.retrieval.sample_number must be a positive int.")

    except ValueError as e:
        context_variables["next_agent"] = agent_name
        return ReplyResult(
            message=f"Error in planner_tool: invalid plan format: {e}",
            target=AgentNameTarget(agent_name),
            context_variables=context_variables,
        )

    context_variables["plan"] = plan
    context_variables["plan_description"] = plan_description.strip()
    context_variables["success_criteria"] = [x.strip() for x in success_criteria]
    context_variables["plan_score"] = plan_score_f

    next_agent = plan["routing"]["after_planner"]
    context_variables["next_agent"] = next_agent

    message = {
        "plan_description": context_variables["plan_description"],
        "success_criteria": context_variables["success_criteria"],
        "plan_score": context_variables["plan_score"],
        "plan": plan,
    }

    return ReplyResult(
        message=json.dumps(message, indent=2, ensure_ascii=False, default=str),
        target=AgentNameTarget(next_agent),
        context_variables=context_variables,
    )


# Tool Reviewer — review_plan

def review_plan_tool(
    decision: Annotated[
        Literal["approve", "revise", "handoff"],
        "approve -> go to retriever; revise -> go back to planner; handoff -> go back to human.",
    ],
    review_notes: Annotated[str, "Short, concrete notes about what is wrong/right in the plan."],
    context_variables: ContextVariables,
) -> ReplyResult:
    """Review context_variables['plan'] and route to the next agent."""

    agent_name = "AgentPlanReviewer"
    plan = context_variables.get("plan")

    fallback_next = "AgentPlanner"
    if isinstance(plan, dict):
        routing_safe = plan.get("routing")
        if isinstance(routing_safe, dict):
            fallback_next = routing_safe.get("on_revise", fallback_next)

    try:
        if plan is None or not isinstance(plan, dict):
            raise ValueError("Missing or invalid plan in context_variables['plan'].")
        if decision not in {"approve", "revise", "handoff"}:
            raise ValueError("decision must be one of: approve, revise, handoff.")
        if not isinstance(review_notes, str) or not review_notes.strip():
            raise ValueError("review_notes must be a non-empty string.")
    except ValueError as e:
        context_variables["review_status"] = "revise"
        context_variables["review_notes"] = f"{str(e)}"
        context_variables["next_agent"] = fallback_next
        return ReplyResult(
            message=f"Plan review -> revise\n{context_variables['review_notes']}",
            target=AgentNameTarget(fallback_next),
            context_variables=context_variables,
        )

    try:
        if not isinstance(plan.get("schema_version"), str) or not plan["schema_version"].strip():
            raise ValueError("plan.schema_version must be a non-empty string.")
        if not isinstance(plan.get("main_task_improved"), str) or not plan["main_task_improved"].strip():
            raise ValueError("plan.main_task_improved must be a non-empty string.")

        routing = plan.get("routing")
        if not isinstance(routing, dict):
            raise ValueError("plan.routing must be a dict.")
        for k in ("after_planner", "on_approve", "on_revise", "on_handoff"):
            if not isinstance(routing.get(k), str) or not routing[k].strip():
                raise ValueError(f"plan.routing.{k} must be a non-empty string.")

        steps = plan.get("steps")
        if not isinstance(steps, list) or len(steps) == 0:
            raise ValueError("plan.steps must be a non-empty list.")

        steps_by_id = {}
        for s in steps:
            if not isinstance(s, dict):
                raise ValueError("Each step in plan.steps must be a dict.")
            sid = s.get("id")
            if not isinstance(sid, str) or not sid.strip():
                raise ValueError("Each step must have a non-empty 'id'.")
            if not isinstance(s.get("agent"), str) or not s["agent"].strip():
                raise ValueError(f"Step '{sid}' must have a non-empty 'agent'.")
            steps_by_id[sid] = s

        guardrails = plan.get("guardrails", {})
        required_ids = []
        if isinstance(guardrails, dict):
            required_ids = guardrails.get("required_step_ids", [])

        if required_ids:
            if not isinstance(required_ids, list) or not all(isinstance(x, str) and x.strip() for x in required_ids):
                raise ValueError("plan.guardrails.required_step_ids must be a list of non-empty strings.")
            missing = sorted(list(set(required_ids) - set(steps_by_id.keys())))
            if missing:
                raise ValueError(f"Missing required step(s): {missing}")

        retrieval = plan.get("retrieval")
        if not isinstance(retrieval, dict):
            raise ValueError("plan.retrieval must be a dict.")
        if not isinstance(retrieval.get("search_criteria"), dict):
            raise ValueError("plan.retrieval.search_criteria must be a dict.")
        fields = retrieval.get("fields")
        if not isinstance(fields, list) or not fields or not all(isinstance(x, str) and x.strip() for x in fields):
            raise ValueError("plan.retrieval.fields must be a non-empty list of non-empty strings.")
        sn = retrieval.get("sample_number")
        if not isinstance(sn, int) or sn <= 0:
            raise ValueError("plan.retrieval.sample_number must be a positive int.")

        if guardrails and not isinstance(guardrails, dict):
            raise ValueError("plan.guardrails must be a dict if present.")

        defaults = plan.get("defaults", {})
        if defaults and not isinstance(defaults, dict):
            raise ValueError("plan.defaults must be a dict if present.")

        if isinstance(guardrails, dict):
            sn_guard = guardrails.get("sample_number")
            if sn_guard is not None:
                if not isinstance(sn_guard, dict):
                    raise ValueError("plan.guardrails.sample_number must be a dict.")
                for kk in ("default", "min", "max"):
                    if kk not in sn_guard:
                        raise ValueError(f"plan.guardrails.sample_number missing key '{kk}'.")
                    if not isinstance(sn_guard[kk], int):
                        raise ValueError(f"plan.guardrails.sample_number.{kk} must be an int.")
                if sn_guard["min"] <= 0:
                    raise ValueError("plan.guardrails.sample_number.min must be > 0.")
                if sn_guard["max"] < sn_guard["min"]:
                    raise ValueError("plan.guardrails.sample_number.max must be >= min.")
                if not (sn_guard["min"] <= sn_guard["default"] <= sn_guard["max"]):
                    raise ValueError("plan.guardrails.sample_number.default must be within [min, max].")

    except ValueError as e:
        context_variables["review_status"] = "revise"
        context_variables["review_notes"] = f"{review_notes.strip()} | validation_error: {e}"
        next_agent = plan["routing"]["on_revise"] if isinstance(plan.get("routing"), dict) else "AgentPlanner"
        context_variables["next_agent"] = next_agent
        return ReplyResult(
            message=f"Plan review -> revise\n{context_variables['review_notes']}",
            target=AgentNameTarget(next_agent),
            context_variables=context_variables,
        )

    context_variables["review_notes"] = review_notes.strip()

    if decision == "approve":
        next_agent = plan["routing"]["on_approve"]
        context_variables["review_status"] = "approved"
        context_variables["next_agent"] = next_agent
        return ReplyResult(
            message=f"Plan review -> approved\n{context_variables['review_notes']}",
            target=AgentNameTarget(next_agent),
            context_variables=context_variables,
        )

    if decision == "revise":
        next_agent = plan["routing"]["on_revise"]
        context_variables["review_status"] = "revise"
        context_variables["next_agent"] = next_agent
        return ReplyResult(
            message=f"Plan review -> revise\n{context_variables['review_notes']}",
            target=AgentNameTarget(next_agent),
            context_variables=context_variables,
        )

    next_agent = plan["routing"]["on_handoff"]
    context_variables["review_status"] = "handoff"
    context_variables["next_agent"] = next_agent
    return ReplyResult(
        message=f"Plan review -> handoff\n{context_variables['review_notes']}",
        target=AgentNameTarget(next_agent),
        context_variables=context_variables,
    )


# Tool — materials_project_retriever

def material_retriever(
    context_variables: ContextVariables,
    search_criteria: Optional[dict] = None,
    fields: Optional[List[str]] = None,
    sample_number: Optional[int] = None,
    plan: Optional[dict] = None,
) -> ReplyResult:
    """Retrieve materials from Materials Project using plan.retrieval.search_criteria / fields / sample_number."""

    agent_name = "AgentMaterialsRetriever"

    if plan is not None:
        if isinstance(plan, dict) and plan:
            context_variables["plan"] = plan
        else:
            context_variables["next_agent"] = "AgentPlanner"
            return ReplyResult(
                message="Error in material_retriever: plan must be a non-empty dict if provided.",
                target=AgentNameTarget("AgentPlanner"),
                context_variables=context_variables,
            )

    plan = context_variables.get("plan", {})
    if not isinstance(plan, dict):
        plan = {}

    routing = plan.get("routing", {})
    if not isinstance(routing, dict):
        routing = {}
    next_on_revise = routing.get("on_revise", "AgentPlanner")

    retrieval = plan.get("retrieval", {})
    if not isinstance(retrieval, dict):
        retrieval = {}

    try:
        if not isinstance(plan, dict) or not plan:
            raise ValueError("Missing or invalid plan in context_variables['plan'].")
        if not isinstance(retrieval, dict) or not retrieval:
            raise ValueError("Missing plan['retrieval'] dict.")

        search_criteria = retrieval.get("search_criteria", None)
        fields = retrieval.get("fields", None)
        sample_number = retrieval.get("sample_number", None)

        if not isinstance(search_criteria, dict):
            raise ValueError("plan['retrieval']['search_criteria'] must be a dict.")
        if not isinstance(fields, list) or len(fields) == 0 or not all(isinstance(f, str) and f.strip() for f in fields):
            raise ValueError("plan['retrieval']['fields'] must be a non-empty list of non-empty strings.")
        if not isinstance(sample_number, int) or sample_number <= 0:
            raise ValueError("plan['retrieval']['sample_number'] must be a positive int.")
    except ValueError as e:
        context_variables["mp_results"] = []
        context_variables["search_criteria"] = {}
        context_variables["fields"] = []
        context_variables["sample_number"] = 0
        context_variables["next_agent"] = next_on_revise
        return ReplyResult(
            message=f"Error in material_retriever: {e}",
            target=AgentNameTarget(next_on_revise),
            context_variables=context_variables,
        )

    guardrails = plan.get("guardrails", {}) if isinstance(plan, dict) else {}
    sn_guard = guardrails.get("sample_number") if isinstance(guardrails, dict) else None

    if isinstance(sn_guard, dict):
        sn_min = sn_guard.get("min")
        sn_max = sn_guard.get("max")
        if isinstance(sn_min, int) and sample_number < sn_min:
            sample_number = sn_min
        if isinstance(sn_max, int) and sample_number > sn_max:
            sample_number = sn_max

    allowed_search_keys = None
    allowed_fields = None

    if isinstance(guardrails, dict):
        ak = guardrails.get("allowed_search_keys")
        af = guardrails.get("allowed_fields")
        if isinstance(ak, list) and all(isinstance(x, str) and x.strip() for x in ak):
            allowed_search_keys = set(x.strip() for x in ak)
        if isinstance(af, list) and all(isinstance(x, str) and x.strip() for x in af):
            allowed_fields = set(x.strip() for x in af)

    if allowed_search_keys is None:
        allowed_search_keys = {
            "band_gap",
            "energy_above_hull",
            "density",
            "num_sites",
            "k_voigt",
            "g_voigt",
            "elements",
            "chemsys",
            "exclude_elements",
            "excluded_elements",
        }

    if allowed_fields is None:
        allowed_fields = {
            "material_id",
            "formula_pretty",
            "band_gap",
            "energy_above_hull",
            "density",
            "volume",
            "symmetry",
            "nsites",
            "elements",
            "chemsys",
            "is_stable",
            "bulk_modulus",
            "shear_modulus",
        }

    try:
        if not all(isinstance(k, str) and k.strip() for k in search_criteria.keys()):
            raise ValueError("search_criteria keys must be non-empty strings.")

        bad_keys = [k for k in search_criteria.keys() if k not in allowed_search_keys]
        if bad_keys:
            raise ValueError(
                f"Unsupported search_criteria key(s): {bad_keys}. "
                f"Allowed keys: {sorted(list(allowed_search_keys))}"
            )

        bad_fields = [f for f in fields if f not in allowed_fields]
        if bad_fields:
            raise ValueError(
                f"Unsupported field(s): {bad_fields}. "
                f"Allowed fields: {sorted(list(allowed_fields))}"
            )
    except ValueError as e:
        context_variables["mp_results"] = []
        context_variables["search_criteria"] = search_criteria
        context_variables["fields"] = fields
        context_variables["sample_number"] = 0
        context_variables["next_agent"] = next_on_revise
        return ReplyResult(
            message=f"Error in material_retriever: {e}",
            target=AgentNameTarget(next_on_revise),
            context_variables=context_variables,
        )

    alias_map = {
        "num_sites": "nsites",
        "k_voigt": "bulk_modulus",
        "g_voigt": "shear_modulus",
        "pretty_formula": "formula_pretty",
        "excluded_elements": "exclude_elements",
    }

    def _is_range_tuple(v) -> bool:
        return isinstance(v, tuple) and len(v) == 2

    def _normalize_numeric_range(v):
        if _is_range_tuple(v):
            lo, hi = v
            if lo is None or hi is None:
                raise ValueError("Numeric range filters must be (min, max) with both values set.")
            if not isinstance(lo, (int, float)) or not isinstance(hi, (int, float)):
                raise ValueError("Numeric range filters must be (min, max) numbers.")
            if lo > hi:
                raise ValueError("Numeric range filters must satisfy min <= max.")
            return (lo, hi)

        if isinstance(v, (int, float)):
            return (v, v)

        return v

    normalized_criteria = {}
    try:
        for key, value in search_criteria.items():
            mapped_key = alias_map.get(key, key)

            if mapped_key in {"band_gap", "energy_above_hull", "density", "nsites", "bulk_modulus", "shear_modulus"}:
                normalized_criteria[mapped_key] = _normalize_numeric_range(value)
                continue

            if mapped_key in {"elements", "chemsys", "exclude_elements"}:
                if not (
                    isinstance(value, list)
                    and len(value) > 0
                    and all(isinstance(x, str) and x.strip() for x in value)
                ):
                    raise ValueError(f"{key} must be a non-empty list of non-empty strings.")
                normalized_criteria[mapped_key] = [x.strip() for x in value]
                continue

            normalized_criteria[mapped_key] = value
    except ValueError as e:
        context_variables["mp_results"] = []
        context_variables["search_criteria"] = search_criteria
        context_variables["fields"] = fields
        context_variables["sample_number"] = 0
        context_variables["next_agent"] = next_on_revise
        return ReplyResult(
            message=f"Error in material_retriever: {e}",
            target=AgentNameTarget(next_on_revise),
            context_variables=context_variables,
        )

    api_key = os.getenv("MP_API_KEY")
    if not api_key:
        context_variables["mp_results"] = []
        context_variables["search_criteria"] = search_criteria
        context_variables["fields"] = fields
        context_variables["sample_number"] = 0
        context_variables["next_agent"] = next_on_revise
        return ReplyResult(
            message="Error in material_retriever: Missing MP_API_KEY environment variable.",
            target=AgentNameTarget(next_on_revise),
            context_variables=context_variables,
        )

    try:
        with MPRester(api_key) as mpr:
            all_results = list(
                mpr.materials.summary.search(
                    fields=fields,
                    **normalized_criteria,
                )
            )
    except Exception as e:
        context_variables["mp_results"] = []
        context_variables["search_criteria"] = search_criteria
        context_variables["fields"] = fields
        context_variables["sample_number"] = 0
        context_variables["next_agent"] = next_on_revise
        return ReplyResult(
            message=f"Error in material_retriever while querying Materials Project: {e}",
            target=AgentNameTarget(next_on_revise),
            context_variables=context_variables,
        )

    if not all_results:
        context_variables["mp_results"] = []
        context_variables["search_criteria"] = search_criteria
        context_variables["fields"] = fields
        context_variables["sample_number"] = 0
        context_variables["next_agent"] = next_on_revise
        return ReplyResult(
            message="No materials found for the given search_criteria. Revise filters while keeping the main intent.",
            target=AgentNameTarget(next_on_revise),
            context_variables=context_variables,
        )

    results = all_results[:sample_number]

    next_agent = "AgentAnalyzer"
    steps = plan.get("steps")
    if isinstance(steps, list):
        for s in steps:
            if isinstance(s, dict) and s.get("id") == "analyze":
                a = s.get("agent")
                if isinstance(a, str) and a.strip():
                    next_agent = a.strip()
                break

    context_variables["search_criteria"] = search_criteria
    context_variables["fields"] = fields
    context_variables["sample_number"] = len(results)
    context_variables["mp_results"] = results
    context_variables["next_agent"] = next_agent

    return ReplyResult(
        message=f"Retrieved {len(results)} materials from Materials Project with search_criteria={search_criteria}.",
        target=AgentNameTarget(next_agent),
        context_variables=context_variables,
    )



# Tool — final_conclusion_tool (Analyzer)

from datetime import datetime, timezone
import json
import os

def final_conclusion_tool(
    context_variables: ContextVariables,
) -> ReplyResult:
    agent_name = "AgentAnalyzer"
    results = context_variables.get("mp_results", None)

    plan = context_variables.get("plan", {})
    routing = plan.get("routing", {}) if isinstance(plan, dict) else {}
    next_on_revise = routing.get("on_revise", "AgentPlanner")

    if results is None or results == []:
        context_variables["next_agent"] = next_on_revise
        return ReplyResult(
            message='{"error":"No materials available in context_variables[\\"mp_results\\"]."}',
            target=AgentNameTarget(next_on_revise),
            context_variables=context_variables,
        )

    if not isinstance(results, list):
        context_variables["next_agent"] = next_on_revise
        return ReplyResult(
            message='{"error":"context_variables[\\"mp_results\\"] must be a list."}',
            target=AgentNameTarget(next_on_revise),
            context_variables=context_variables,
        )

    results = [r for r in results if r is not None]
    if len(results) == 0:
        context_variables["next_agent"] = next_on_revise
        return ReplyResult(
            message='{"error":"context_variables[\\"mp_results\\"] contains no valid entries."}',
            target=AgentNameTarget(next_on_revise),
            context_variables=context_variables,
        )

    main_task = context_variables.get("main_task_improved", "")
    if not isinstance(main_task, str) or not main_task.strip():
        main_task = ""

    def _to_float(x):
        try:
            if x is None:
                return None
            return float(x)
        except Exception:
            return None

    def get_value(entry, k):
        if isinstance(entry, dict):
            return entry.get(k)
        return getattr(entry, k, None)

    def _percentiles(vals, ps):
        if not vals:
            return {p: None for p in ps}
        s = sorted(vals)
        n = len(s)
        out = {}
        for p in ps:
            if n == 1:
                out[p] = s[0]
                continue
            q = (p / 100.0) * (n - 1)
            lo = int(q)
            hi = min(lo + 1, n - 1)
            frac = q - lo
            out[p] = s[lo] * (1 - frac) + s[hi] * frac
        return out

    def _jsonable(x):
        if x is None:
            return None
        if isinstance(x, (str, int, float, bool)):
            return x
        if isinstance(x, (list, tuple)):
            return [_jsonable(v) for v in x]
        if isinstance(x, dict):
            out = {}
            for k, v in x.items():
                if isinstance(k, (str, int, float, bool)):
                    out[str(k)] = _jsonable(v)
                else:
                    out[str(k)] = _jsonable(v)
            return out

        if hasattr(x, "model_dump"):
            try:
                d = x.model_dump()
                return _jsonable(d)
            except Exception:
                pass

        if hasattr(x, "as_dict"):
            try:
                d = x.as_dict()
                return _jsonable(d)
            except Exception:
                pass

        if hasattr(x, "__dict__"):
            try:
                return _jsonable(vars(x))
            except Exception:
                pass

        try:
            return str(x)
        except Exception:
            return None

    def _as_material_dict(x):
        if isinstance(x, dict):
            d = dict(x)
        elif hasattr(x, "model_dump"):
            try:
                d = x.model_dump()
            except Exception:
                d = {}
        elif hasattr(x, "dict"):
            try:
                d = x.dict()
            except Exception:
                d = {}
        else:
            keys = [
                "material_id",
                "formula_pretty",
                "band_gap",
                "energy_above_hull",
                "e_above_hull",
                "density",
                "elements",
                "is_stable",
            ]
            d = {}
            for k in keys:
                try:
                    d[k] = getattr(x, k, None)
                except Exception:
                    d[k] = None

        return _jsonable(d)

    serializable_results = [_as_material_dict(r) for r in results]

    bg_vals = []
    eah_vals = []
    dens_vals = []

    bg_missing = 0
    eah_missing = 0
    dens_missing = 0

    for entry in serializable_results:
        bg = _to_float(get_value(entry, "band_gap"))
        eah = _to_float(get_value(entry, "energy_above_hull") or get_value(entry, "e_above_hull"))
        dens = _to_float(get_value(entry, "density"))

        bg_vals.append(bg)
        eah_vals.append(eah)
        dens_vals.append(dens)

        if bg is None:
            bg_missing += 1
        if eah is None:
            eah_missing += 1
        if dens is None:
            dens_missing += 1

    bg_clean = [v for v in bg_vals if isinstance(v, (int, float))]
    eah_clean = [v for v in eah_vals if isinstance(v, (int, float))]
    dens_clean = [v for v in dens_vals if isinstance(v, (int, float))]

    n_total = len(serializable_results)

    stable_count = 0
    near_stable_count = 0
    meta_count = 0
    unstable_count = 0

    for v in eah_clean:
        if v <= 1e-6:
            stable_count += 1
        elif v <= 0.05:
            near_stable_count += 1
        elif v <= 0.2:
            meta_count += 1
        else:
            unstable_count += 1

    bg_bins = {
        "gapless_or_metal_like": 0,
        "narrow_gap": 0,
        "semiconductor": 0,
        "wide_bandgap": 0,
        "very_wide_bandgap": 0,
    }

    for v in bg_clean:
        if v < 0.1:
            bg_bins["gapless_or_metal_like"] += 1
        elif v < 1.0:
            bg_bins["narrow_gap"] += 1
        elif v < 3.0:
            bg_bins["semiconductor"] += 1
        elif v <= 6.0:
            bg_bins["wide_bandgap"] += 1
        else:
            bg_bins["very_wide_bandgap"] += 1

    bg_p = _percentiles(bg_clean, [10, 50, 90])
    eah_p = _percentiles(eah_clean, [10, 50, 90])
    dens_p = _percentiles(dens_clean, [10, 50, 90])

    output = {
        "schema_version": "1.1",
        "created_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
        "main_task_improved": main_task.strip(),
        "counts": {
            "materials_retrieved": n_total,
            "missing": {
                "band_gap": bg_missing,
                "energy_above_hull": eah_missing,
                "density": dens_missing,
            },
        },
        "ranges": {
            "band_gap_eV": {
                "p10": bg_p[10],
                "p50": bg_p[50],
                "p90": bg_p[90],
            },
            "energy_above_hull_eV_per_atom": {
                "p10": eah_p[10],
                "p50": eah_p[50],
                "p90": eah_p[90],
            },
            "density_g_cm3": {
                "p10": dens_p[10],
                "p50": dens_p[50],
                "p90": dens_p[90],
            },
        },
        "patterns": {
            "stability_bins": {
                "stable_eah_eq_0": stable_count,
                "near_stable_le_0p05": near_stable_count,
                "metastable_0p05_to_0p2": meta_count,
                "likely_unstable_gt_0p2": unstable_count,
            },
            "band_gap_bins": bg_bins,
        },
        "limitations": [
            "Summary uses only retrieved fields (band_gap, energy_above_hull, density, volume if present).",
            "No thermal transport, melting point, oxidation kinetics, toxicity, or mechanical validation included.",
        ],
    }

    concise = (
        f"Retrieved {n_total} candidate materials. "
        f"Energy-above-hull median is {eah_p[50]} eV/atom (p10={eah_p[10]}, p90={eah_p[90]}), "
        f"with {stable_count} stable and {near_stable_count} near-stable entries. "
        "Use this as a shortlist signal only; field performance depends on availability, safety constraints, and scenario-specific testing."
    )

    context_variables["analysis_summary"] = output
    context_variables["final_conclusion"] = concise
    context_variables["next_agent"] = "AgentCoder"

    project_folder = os.environ.get("PROJECT_FOLDER", "ag2_project")
    os.makedirs(project_folder, exist_ok=True)

    final_path = os.path.join(project_folder, "final_conclusion.json")
    with open(final_path, "w", encoding="utf-8") as f:
        json.dump(
            {"analysis_summary": output, "final_conclusion": concise},
            f,
            indent=2,
            ensure_ascii=False,
        )

    materials_path = os.path.join(project_folder, "materials_data.json")
    payload = {
        "mp_results": serializable_results,
        "analysis_summary": output,
        "final_conclusion": concise,
    }
    with open(materials_path, "w", encoding="utf-8") as f:
        json.dump(payload, f, indent=2, ensure_ascii=False)

    context_variables["materials_data_path"] = materials_path

    return ReplyResult(
        message=json.dumps(
            {"analysis_summary": output, "final_conclusion": concise},
            indent=2,
            ensure_ascii=False,
        ),
        target=AgentNameTarget("AgentCoder"),
        context_variables=context_variables,
    )


# Tool — python_coder

project_folder = os.path.abspath("ag2_project")
os.makedirs(project_folder, exist_ok=True)
os.environ["PROJECT_FOLDER"] = project_folder


class PythonCode(BaseModel):
    code: str = Field(..., description="Full python code executed by the coder agent")


def _safe_filename(name: str) -> str:
    name = os.path.basename((name or "").strip())
    if len(name) == 0:
        return "script.py"

    keep = []
    for ch in name:
        if ch.isalnum() or ch in {".", "_", "-"}:
            keep.append(ch)
    name = "".join(keep)

    if len(name) == 0:
        name = "script.py"
    if not name.endswith(".py"):
        name = name + ".py"
    if len(name) > 120:
        name = name[-120:]
    return name


def python_coder_tool(
    code: Annotated[str, "A single block of Python code to execute."],
    file_name: Annotated[str, "Name of the code file to store the executed script, e.g., 'script.py'"],
    context_variables: ContextVariables,
) -> ReplyResult:
    agent_name = "AgentCoder"

    plan = context_variables.get("plan", {})
    routing = plan.get("routing", {}) if isinstance(plan, dict) else {}
    next_agent = routing.get("after_code", None)
    if not isinstance(next_agent, str) or not next_agent.strip():
        next_agent = "Human"

    try:
        if not isinstance(code, str) or len(code.strip()) == 0:
            raise ValueError("The 'code' parameter must be a non-empty string.")
        if not isinstance(file_name, str) or len(file_name.strip()) == 0:
            raise ValueError("The 'file_name' parameter must be a non-empty string.")
    except ValueError as e:
        context_variables["next_agent"] = agent_name
        return ReplyResult(
            message=f"Error in python_coder_tool: {e}",
            target=AgentNameTarget(agent_name),
            context_variables=context_variables,
        )

    safe_name = _safe_filename(file_name)

    runtime_prelude = (
        "import os\n"
        "import json\n"
        "PROJECT_FOLDER = os.environ.get('PROJECT_FOLDER', 'ag2_project')\n"
        "os.makedirs(PROJECT_FOLDER, exist_ok=True)\n"
        "os.chdir(PROJECT_FOLDER)\n"
        "\n"
        "def _ensure_materials_data_json():\n"
        "    if os.path.exists('materials_data.json'):\n"
        "        return\n"
        "    if not os.path.exists('final_conclusion.json'):\n"
        "        return\n"
        "    try:\n"
        "        with open('final_conclusion.json', 'r', encoding='utf-8') as f:\n"
        "            payload = json.load(f)\n"
        "    except Exception:\n"
        "        return\n"
        "    mp_results = payload.get('mp_results')\n"
        "    analysis_summary = payload.get('analysis_summary')\n"
        "    final_conclusion = payload.get('final_conclusion')\n"
        "    if mp_results is None:\n"
        "        return\n"
        "    with open('materials_data.json', 'w', encoding='utf-8') as f:\n"
        "        json.dump(\n"
        "            {\n"
        "                'mp_results': mp_results,\n"
        "                'analysis_summary': analysis_summary,\n"
        "                'final_conclusion': final_conclusion,\n"
        "            },\n"
        "            f,\n"
        "            indent=2,\n"
        "            ensure_ascii=False,\n"
        "            default=str,\n"
        "        )\n"
        "\n"
        "_ensure_materials_data_json()\n"
    )

    full_code = "#!/usr/bin/env python3\n" + runtime_prelude + "\n" + code.strip() + "\n"
    python_code_model = PythonCode(code=full_code)

    executor = LocalCommandLineCodeExecutor(timeout=600, work_dir=project_folder)
    code_block = CodeBlock(language="python", code=full_code)

    try:
        result = executor.execute_code_blocks([code_block])
    except Exception as e:
        context_variables["last_executed_code"] = full_code
        context_variables["last_execution_output"] = f"{e}"
        context_variables["last_execution_exit_code"] = 1
        context_variables["last_executed_file"] = ""
        context_variables["next_agent"] = agent_name
        return ReplyResult(
            message="Code execution failed due to an internal executor error:\n" + f"{e}\n",
            target=AgentNameTarget(agent_name),
            context_variables=context_variables,
        )

    exit_code = result.exit_code
    output = result.output

    context_path = os.path.join(project_folder, "context_variables_data.json")

    try:
        with open(context_path, "r", encoding="utf-8") as f:
            context_file_data = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError):
        context_file_data = {
            "execution_results": {},
            "execution_history": [],
            "execution_notes": [],
            "code": [],
            "code_error": "",
        }

    if "execution_results" not in context_file_data or not isinstance(context_file_data.get("execution_results"), dict):
        context_file_data["execution_results"] = {}
    if "execution_history" not in context_file_data or not isinstance(context_file_data.get("execution_history"), list):
        context_file_data["execution_history"] = []
    if "execution_notes" not in context_file_data or not isinstance(context_file_data.get("execution_notes"), list):
        context_file_data["execution_notes"] = []
    if "code" not in context_file_data or not isinstance(context_file_data.get("code"), list):
        context_file_data["code"] = []

    if exit_code != 0:
        context_file_data["code_error"] = python_code_model.model_dump()
    else:
        context_file_data["code"].append(python_code_model.model_dump())
        context_file_data["code_error"] = ""

    context_file_data["execution_results"][safe_name] = {
        "exit_code": exit_code,
        "output": output,
    }
    context_file_data["execution_history"].append(f"Executed {safe_name} with exit_code={exit_code}")
    context_file_data["execution_notes"].append("Executed code and stored output/exit_code.")

    code_output_path = os.path.join(project_folder, safe_name)
    os.makedirs(os.path.dirname(code_output_path), exist_ok=True)
    with open(code_output_path, "w", encoding="utf-8") as f:
        f.write(full_code)

    try:
        if getattr(result, "code_file", None) and os.path.exists(result.code_file):
            os.remove(result.code_file)
    except Exception:
        pass

    with open(context_path, "w", encoding="utf-8") as f:
        json.dump(context_file_data, f, indent=2, ensure_ascii=False)

    context_variables["last_executed_code"] = full_code
    context_variables["last_execution_output"] = output
    context_variables["last_execution_exit_code"] = exit_code
    context_variables["last_executed_file"] = code_output_path

    if "execution_history" not in context_variables or not isinstance(context_variables.get("execution_history"), list):
        context_variables["execution_history"] = []
    context_variables["execution_history"].append({"file_name": safe_name, "exit_code": exit_code})

    if exit_code == 0:
        message = f"Code executed successfully.\n{output}"
    else:
        message = f"Code execution failed.\n{output}"

    context_variables["next_agent"] = next_agent

    return ReplyResult(
        message=message,
        target=AgentNameTarget(next_agent),
        context_variables=context_variables,
    )

# Tool — handoff_to_human

def handoff_to_human_tool(
    message: Annotated[str, "Short explanation of why the query cannot be handled by Materials Project."],
    context_variables: ContextVariables,
) -> ReplyResult:
    agent_name = "Human"

    try:
        if not isinstance(message, str) or not message.strip():
            raise ValueError("handoff message must be a non-empty string.")
    except ValueError as e:
        context_variables["next_agent"] = agent_name
        return ReplyResult(
            message=f"Error in handoff_to_human_tool: {e}",
            target=AgentNameTarget(agent_name),
            context_variables=context_variables,
        )

    msg = message.strip()
    context_variables["handoff_reason"] = msg
    context_variables["next_agent"] = agent_name

    return ReplyResult(
        message=msg,
        target=AgentNameTarget(agent_name),
        context_variables=context_variables,
    )


_required_tools = [
    "task_improver_tool",
    "planner_tool",
    "review_plan_tool",
    "material_retriever",
    "final_conclusion_tool",
    "python_coder_tool",
    "handoff_to_human_tool",
]

_missing = [t for t in _required_tools if t not in globals() or not callable(globals()[t])]
if _missing:
    raise RuntimeError(f"Tools missing or not callable: {_missing}")

print(f"✓ Tools loaded OK ({len(_required_tools)}).")




✓ Tools loaded OK (7).


# 5. AGENTS (TaskImprover → Planner → PlanReviewer → MaterialsRetriever → Analyzer → Coder + Human)

In [42]:
# Agent: Task Improver

task_improver_message = """
You are AgentTaskImprover.

Your task is to improve the provided main task for its specific goal.

CRITERIA
- Analyze the consistency of the main task (does it make sense, is it complete, is it lacking information).
- Check if the main task is complete or if it lacks any information.
- Detect redundancy and remove it without changing the goal.
- If information is missing, make your best guess about what is missing and suggest a better formulation.
- Focus only on improving the task for the specific goal it sets.

OUTPUT
- Always produce: analysis, improvements, improved_task.

AGENTS IN THE TEAM
- AgentPlanner: creates a plan JSON from main_task_improved.
- AgentPlanReviewer: validates the plan and routes approve/revise/handoff.
- AgentMaterialsRetriever: queries Materials Project using whitelisted keys/fields.
- AgentAnalyzer: produces structured JSON analysis from mp_results.
- AgentCoder: executes Python code to generate artifacts.
- Human: reformulates the query if needed.

TOOL USAGE
- You must always use task_improver_tool.
- Do not answer directly without using the tool.
"""

AgentTaskImprover = ConversableAgent(
    name="AgentTaskImprover",
    llm_config=llm_config,
    system_message=task_improver_message,
    human_input_mode="NEVER",
    functions=[task_improver_tool],
    function_map={"task_improver_tool": task_improver_tool},
)


# Agent: Planner

planner_message = """
You are AgentPlanner.

Your task is to create a clean multi-agent execution plan from context_variables["main_task_improved"].

RULES
- Use only information already present in context_variables.
- You must build a dict called plan and pass it to planner_tool as the 'plan' argument.
- The plan must be consistent with the toolchain and valid for planner_tool validation.

PLAN FORMAT 
- schema_version: string
- main_task_improved: must match context_variables["main_task_improved"]
- fireground_signals: dict inferred from the user text (environment, risks, conditions, fuel)
- material_roles: dict (roles and why they matter)
- retrieval: dict with:
  - search_criteria: dict (Materials Project proxy filters)
  - fields: list[str]
  - sample_number: int
- steps: list of dict steps with required ids: retrieve, analyze, code
- routing: dict with non-empty strings:
  - after_planner
  - on_approve
  - on_revise
  - on_handoff

RETRIEVAL CONSTRAINTS
- plan.retrieval.search_criteria must use only these keys:
  - band_gap, energy_above_hull, density, num_sites, k_voigt, g_voigt, elements, chemsys, exclude_elements, excluded_elements
- Do not use mongo operators ($lte, $gt, etc).
- Numeric filters must be tuples (min, max) with both numbers set (no None):
  - energy_above_hull: (0.0, 0.05)
  - band_gap: (3.0, 20.0)
  - density: (3.0, 10.0)
- fields must be a subset of:
  - material_id, formula_pretty, band_gap, energy_above_hull, density, volume, symmetry, nsites, elements, chemsys, is_stable, bulk_modulus, shear_modulus

STEPS CONSTRAINTS
- Each step must include a non-empty 'agent':
  - retrieve -> AgentMaterialsRetriever
  - analyze -> AgentAnalyzer
  - code -> AgentCoder

ROUTING CONSTRAINTS
- routing.after_planner must be "AgentPlanReviewer"
- routing.on_approve must be "AgentMaterialsRetriever"
- routing.on_revise must be "AgentTaskImprover"
- routing.on_handoff must be "Human"

ROUTING
- After you build the plan, route to AgentPlanReviewer.

TOOL USAGE
- You must always use planner_tool.
- Call planner_tool with: plan, plan_description, success_criteria, plan_score.
- Do not answer directly without using the tool.
"""

AgentPlanner = ConversableAgent(
    name="AgentPlanner",
    llm_config=llm_config,
    system_message=planner_message,
    human_input_mode="NEVER",
    functions=[planner_tool],
    function_map={"planner_tool": planner_tool},
)


# Agent: Plan Reviewer

plan_reviewer_message = """
You are AgentPlanReviewer.

Your task is to review context_variables["plan"] and decide:
- approve -> go to retriever
- revise -> go back to planner
- handoff -> go back to human

RULES
- Validate the plan shape and basic wiring.
- Keep review_notes short and concrete.
- If the plan is missing required keys or required steps defined in the plan, choose "revise".
- Do not change the plan here; only decide routing.

ROUTING CONSTRAINTS
- approve must route to "AgentMaterialsRetriever"
- revise must route to "AgentTaskImprover"
- handoff must route to "Human"

TOOL USAGE
- You must always use review_plan_tool.
- Do not answer directly without using the tool.
"""

AgentPlanReviewer = ConversableAgent(
    name="AgentPlanReviewer",
    llm_config=llm_config,
    system_message=plan_reviewer_message,
    human_input_mode="NEVER",
    functions=[review_plan_tool],
    function_map={"review_plan_tool": review_plan_tool},
)


# Agent: Materials Retriever

materials_retriever_message = """
You are AgentMaterialsRetriever.

Your task is to retrieve candidate materials from Materials Project
based strictly on the execution plan stored in context_variables["plan"].

WHAT YOU DO
- Read search_criteria, fields, and sample_number from plan["retrieval"].
- Call material_retriever exactly once.
- Do not invent, modify, or infer filters or fields.
- Store results in context_variables["mp_results"].
- Do not analyze or rank materials.

RULES
- Use exactly what the plan provides for retrieval.
- Use only whitelisted search_criteria keys and fields.
- Numeric filters must be tuples: (min, max) with both values set (no open-ended bounds).
- List filters must be lists of strings.
- No Mongo-style operators.

TOOL USAGE
- Always call material_retriever with no arguments.
- Do not pass search_criteria, fields, or sample_number explicitly.
- Do not answer directly.
"""

AgentMaterialsRetriever = ConversableAgent(
    name="AgentMaterialsRetriever",
    llm_config=llm_config,
    system_message=materials_retriever_message,
    human_input_mode="NEVER",
    functions=[material_retriever],
    function_map={"material_retriever": material_retriever},
)

# Agent: Analyzer

analyzer_message = """
You are AgentAnalyzer.

ROLE
- Build a data-grounded conclusion from Materials Project results.

INPUTS (from context_variables)
- context_variables["mp_results"]: list of entries returned by the retriever tool.
- context_variables.get("main_task_improved", ""): task intent.
- If main_task_improved is missing, proceed using mp_results only.

YOUR TASKS
1) Read mp_results and use only available fields (no assumptions).
2) Produce a data summary:
   - band_gap: distribution signal only
   - energy_above_hull: stability bins and ranges
   - density: descriptive stats only
3) Produce a short final_conclusion string for operational use.
4) Return JSON only.

OUTPUT FORMAT
- Return the JSON produced by final_conclusion_tool:
  - analysis_summary (dict)
  - final_conclusion (short string)

TOOL USAGE
- Always call final_conclusion_tool.
- Do not answer directly.
"""

AgentAnalyzer = ConversableAgent(
    name="AgentAnalyzer",
    llm_config=llm_config,
    system_message=analyzer_message,
    human_input_mode="NEVER",
    functions=[final_conclusion_tool],
    function_map={"final_conclusion_tool": final_conclusion_tool},
)

# Agent: Coder

coder_message = """
You are AgentCoder.

ROLE
- Generate and execute Python code when needed by the pipeline.

TASKS
- When invoked, generate at least ONE computational artifact:
  - export a CSV summary table (material_id, formula_pretty, band_gap, energy_above_hull, density), and/or
  - plot a histogram of band_gap distribution.
- Additionally, print a clean, readable table to stdout so results are visible at the end of execution.
- Additionally, generate a per-material plot showing the retrieved materials.

DATA SOURCE
- The Analyzer persists data to materials_data.json in the project folder.
- Use materials_data.json as the single source of truth for plots and tables.
- Assume python_coder_tool sets the working directory to PROJECT_FOLDER (ag2_project). Use relative paths only:
  - read "materials_data.json"
  - write outputs like "summary_table.csv", "band_gap_histogram.png", and "materials_scatter.png"
- Prefer generating all artifacts in a single python_coder_tool call.

CRITICAL IO RULE
- materials_data.json is a dict payload.
- Load it as:
  - payload = json.load(f)
  - mp_results = payload.get("mp_results", [])
- mp_results is the ONLY iterable list of materials.

TABLE RULES
- Build the table by iterating mp_results into rows.
- Accept both "energy_above_hull" and "e_above_hull".
- Missing fields must be written as empty values.
- Print a compact table with aligned columns.

PLOT RULES
- Histogram includes only numeric band_gap values.
- Per-material plot uses numeric band_gap and density values.
- Scatter plot: density vs band_gap.
- Label points with material_id or formula_pretty.
- Save plots to disk.

TOOL USAGE
- Always call python_coder_tool for code execution.
- Do not execute code directly.
"""

AgentCoder = ConversableAgent(
    name="AgentCoder",
    llm_config=llm_config,
    system_message=coder_message,
    human_input_mode="NEVER",
    functions=[python_coder_tool],
    function_map={"python_coder_tool": python_coder_tool},
)


# Human Agent

Human = ConversableAgent(
    name="Human",
    llm_config=None,
    human_input_mode="ALWAYS",
)
print("✓ Agents created.")

✓ Agents created.


# 6. Pattern definition 


In [43]:

pattern = DefaultPattern(
    initial_agent=AgentTaskImprover,
    user_agent=Human,
    agents=[
        AgentTaskImprover,
        AgentPlanner,
        AgentPlanReviewer,
        AgentMaterialsRetriever,
        AgentAnalyzer,
        AgentCoder,
    ],
    context_variables=context_variables,
)


# 7. Group Chat

In [None]:
# 7. Group Chat

print("Hi! Type your query and press Enter.\n")
human_query = input("Query: ").strip()

if not human_query:
    raise ValueError("Query cannot be empty.")

context_variables["query"] = human_query

result, context, _ = initiate_group_chat(
    pattern=pattern,
    messages=[{"role": "user", "content": human_query}],
    max_rounds=20,
)


Hi! Type your query and press Enter.



Query:  Inorganic materials suitable for protective barriers or coatings near high-voltage power lines in wildfire-prone forested areas. Focus on materials with energy above hull between 0.0 and 0.05 eV/atom, band gap between 3 and 10 eV, and density between 3 and 8 g/cm³. Exclude materials containing toxic or volatile elements such as Pb, Cd, or Hg. The goal is to identify stable inorganic materials appropriate for thermal and electrical insulation under wildfire conditions.


[33mHuman[0m (to chat_manager):

Inorganic materials suitable for protective barriers or coatings near high-voltage power lines in wildfire-prone forested areas. Focus on materials with energy above hull between 0.0 and 0.05 eV/atom, band gap between 3 and 10 eV, and density between 3 and 8 g/cm³. Exclude materials containing toxic or volatile elements such as Pb, Cd, or Hg. The goal is to identify stable inorganic materials appropriate for thermal and electrical insulation under wildfire conditions.

--------------------------------------------------------------------------------
[32m
Next speaker: AgentTaskImprover
[0m
[33mAgentTaskImprover[0m (to chat_manager):

[32m***** Suggested tool call (call_w0sNSsGPy4Oeooe9HvTutvKN): task_improver_tool *****[0m
Arguments: 
{"analysis":"The task is well-defined in terms of specific material properties like energy above hull, band gap, and density. However, it assumes that the reader already knows how to retrieve and analyze materials d

Retrieving SummaryDoc documents:   0%|          | 0/9122 [00:00<?, ?it/s]

[35m
>>>>>>>> EXECUTED FUNCTION material_retriever...
Call ID: call_astGzE8OJ168M9qBErqdAwyB
Input arguments: {'plan': None}
Output:
Retrieved 50 materials from Materials Project with search_criteria={'energy_above_hull': [0.0, 0.05], 'band_gap': [3.0, 10.0], 'density': [3.0, 8.0], 'exclude_elements': ['Pb', 'Cd', 'Hg']}.[0m
[34m***** ReplyResult transition (AgentMaterialsRetriever): AgentAnalyzer *****[0m
[33m_Group_Tool_Executor[0m (to chat_manager):

[32m***** Response from calling tool (call_astGzE8OJ168M9qBErqdAwyB) *****[0m
Retrieved 50 materials from Materials Project with search_criteria={'energy_above_hull': [0.0, 0.05], 'band_gap': [3.0, 10.0], 'density': [3.0, 8.0], 'exclude_elements': ['Pb', 'Cd', 'Hg']}.
[32m**********************************************************************[0m

--------------------------------------------------------------------------------
[32m
Next speaker: AgentAnalyzer
[0m
[33mAgentAnalyzer[0m (to chat_manager):

[32m***** Suggested 