In [97]:
import os, json
import httpx
from typing import Optional, List
from pydantic import BaseModel, Field
from openai import AsyncAzureOpenAI
from agents import Agent, Runner, OpenAIChatCompletionsModel, function_tool
import logging
# -------- Tool: post the punch item to your route --------
# Send extracted punch list to Kahua

# Global logging configuration
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger("kahua_tool")
logger.setLevel(logging.INFO)
# Dedicated logger for agent handoffs
handoff_log = logging.getLogger("agents.handoff")
handoff_log.setLevel(logging.INFO)

In [98]:
# -------- Azure OpenAI client --------
azure_client = AsyncAzureOpenAI(
    api_key=os.environ["AZURE_KEY"],
    azure_endpoint=os.environ["AZURE_ENDPOINT"],
    api_version=os.environ["API_VERSION"],
)

In [None]:
# -------- Schema the agent will produce --------
class PunchItem(BaseModel):
    subject: Optional[str] = Field(None, description="Short subject")
    description: Optional[str] = Field(None, description="Detailed description")
    partitionid: Optional[int] = Field(None, description="Project id")
    # Optional flag so the agent can decide to actually send
    auto_send: bool = Field(False, description="If true, call the Kahua tool")

class ProjectItem(BaseModel):
    id: Optional[str] = Field(0, description="Project id")
    name: str = Field(..., description="Project name (Kahua: Name)")
    description: Optional[str] = Field(None, description="Project description (Kahua: Description)")
    auto_send: bool = Field(True, description="If true, tool can be called immediately by the agent")



In [100]:
KAHUA_BASIC_AUTH = os.getenv("KAHUA_BASIC_AUTH")
KAHUA_ACTIVITY_URL = "https://devdailyservice.kahua.com/v2/domains/AWrightCo/projects/0/apps/kahua_AEC_RFI/activities/run"

In [None]:
@function_tool
async def create_pydantic_model(item: PunchItem) -> dict:

In [101]:
@function_tool
async def create_project_in_kahua(project: ProjectItem) -> dict:
    """
    Create a Project in Kahua (entityDef: kahua_Project.Project).
    Mirrors your requests-based example, but as an async Agents tool.
    """
    # Map ProjectItem -> Kahua fields
    entity = {
        "id": 0,
        "hubPath": "kahua_Project.NoWorkflow\\Start",
        "entityDef": "kahua_Project.Project",
        "Name": project.name,
    }
    if project.description:
        entity["Description"] = project.description
    if project.id:
        entity["Id"] = project.id

    payload = {
        "activity": {
            "PropertyName": "Activity",
            "Name": "ContractItem",
            "Flow": [
                {"PropertyName": "Iterate", "Set": "ContractItem", "New": {}, "Existing": {}}
            ],
        },
        "sets": [{"name": "ContractItem", "entities": [entity]}],
    }

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Basic {KAHUA_BASIC_AUTH}",
    }
    # Redacted log
    try:
        redacted = KAHUA_BASIC_AUTH[:10] + "...redacted" if isinstance(KAHUA_BASIC_AUTH, str) else "<none>"
        logger.info(f"POST {KAHUA_ACTIVITY_URL} | Auth={redacted}")
        logger.info(f"Payload preview: {json.dumps({'Name': entity.get('Name'), 'Description': (entity.get('Description') or '')[:120]}, ensure_ascii=False)}")
    except Exception:
        pass

    async with httpx.AsyncClient(timeout=20.0) as client:
        resp = await client.post(KAHUA_ACTIVITY_URL, headers=headers, json=payload)
        ctype = resp.headers.get("content-type", "")
        body = resp.json() if "application/json" in (ctype or "") else {"text": resp.text}

        if resp.status_code >= 400:
            logger.error(f"Kahua project create failed {resp.status_code}: {resp.text}")
            return {
                "status": "error",
                "upstream_status": resp.status_code,
                "upstream_content_type": ctype,
                "upstream_body": body,
                "payload_preview": {"Name": entity.get("Name"), "Description": (entity.get("Description") or "")[:200]},
            }

        return {
            "status": "ok",
            "upstream_status": resp.status_code,
            "kahua_response": body,
            "payload_preview": {"Name": entity.get("Name"), "Description": (entity.get("Description") or "")[:200]},
        }

In [102]:
@function_tool
async def query_entity_def(entity_def: str) -> dict:
    url = "https://devdailyservice.kahua.com/v2/domains/AWrightCo/projects/0/query?returnDefaultAttributes=true"
    payload = {
        "PropertyName": "Query",
        "EntityDef": entity_def,
    }
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Basic {KAHUA_BASIC_AUTH}",
    }

    async with httpx.AsyncClient(timeout=20.0) as client:
        resp = await client.post(url, headers=headers, json=payload)
        ctype = resp.headers.get("content-type", "")
        body = resp.json() if "application/json" in (ctype or "") else {"text": resp.text}

        if resp.status_code >= 400:
            return {
                "status": "error",
                "upstream_status": resp.status_code,
                "upstream_content_type": ctype,
                "upstream_body": body,
            }

        return {
            "status": "ok",
            "upstream_status": resp.status_code,
            "body": body,
        }

In [103]:
# ---- Tool: post directly to Kahua (no FastAPI bridge) ----
@function_tool
async def create_punch_list_item(item: PunchItem) -> dict:
    """
    Create a PunchListItem in Kahua using the same logic as your FastAPI route.
    Expects env:
      - KAHUA_URL (optional; default points to devdailyservice)
      - KAHUA_BASIC_AUTH (Base64 Basic header, e.g., 'Basic <base64userpass>')
    """
    logger.info("Sending to Kahua (tool)")

    # Convert to plain dict so we can check alternate keys if you add any later
    data = item.model_dump()

    # --- Subject/Description synthesis (mirrors your server-side logic) ---
    subject = data.get("subject")
    description = data.get("description")

    if not subject or not description:
        location = data.get("location") or data.get("area")
        defect = data.get("defect")
        desc = data.get("description")
        if not subject:
            if location and defect:
                subject = f"{location} – {defect}"
            elif location and desc:
                subject = f"{location} – {desc[:60]}"
            else:
                subject = defect or (desc[:80] if desc else "Punch Item")
        if not description:
            description = desc or defect or subject

    if not subject or not description:
        logger.warning("Kahua export missing subject/description; using synthesized fallbacks")
        subject = subject or "Punch Item"
        description = description or subject

    kahua_auth = f"Basic {KAHUA_BASIC_AUTH}"
    # --- Payload mirrors your route ---
    payload = {
        "activity": {
            "PropertyName": "Activity",
            "Name": "ContractItem",
            "Flow": [
                {
                    "PropertyName": "Iterate",
                    "Set": "ContractItem",
                    "New": {},
                    "Existing": {},
                }
            ],
        },
        "sets": [
            {
                "name": "ContractItem",
                "entities": [
                    {
                        "id": 0,
                        "hubPath": "kahua_AEC_PunchList.NoWorkflow\\Start",
                        "entityDef": "kahua_AEC_PunchList.PunchListItem",
                        "Subject": subject,
                        "Description": description,
                        "PartitionId": item.partitionid,
                    }
                ],
            }
        ],
    }

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Basic {KAHUA_BASIC_AUTH}",
    }

    # Sanitized logs
    try:
        redacted = kahua_auth[:10] + "...redacted" if isinstance(kahua_auth, str) else "<none>"
        logger.info(f"Kahua URL: {KAHUA_ACTIVITY_URL} | Auth: {redacted}")
        logger.info(f"Kahua payload preview: {json.dumps({'Subject': subject, 'Description': description[:200]}, ensure_ascii=False)}")
    except Exception:
        pass

    # --- Send ---
    async with httpx.AsyncClient(timeout=20.0) as client:
        resp = await client.post(KAHUA_ACTIVITY_URL, headers=headers, json=payload)
        ctype = resp.headers.get("content-type", "")
        text_body = resp.text
        logger.info(f"Kahua response {resp.status_code} | content-type={ctype}")

        if resp.status_code >= 400:
            logger.error(f"Kahua send failed {resp.status_code}: {text_body}")
            return {
                "status": "error",
                "upstream_status": resp.status_code,
                "upstream_content_type": ctype,
                "upstream_body": text_body,
                "payload_preview": {"Subject": subject, "Description": description[:200] if description else None},
            }

        data = resp.json() if "application/json" in ctype else {"text": text_body}
        return {
            "status": "ok",
            "upstream_status": resp.status_code,
            "kahua_response": data,
            "payload_preview": {"Subject": subject, "Description": description[:200] if description else None},
        }


In [104]:
# ---- Tool: log agent handoffs ----
@function_tool
async def log_handoff(from_agent: str, to_agent: str, reason: str = "") -> dict:
    handoff_log.info(f"HANDOFF: {from_agent} -> {to_agent} | reason={reason}")
    return {"status": "logged", "from": from_agent, "to": to_agent, "reason": reason}


In [114]:
# -------- Specialists --------
entity_query_agent = Agent(
    name="Entity Query",
    handoff_description="Specialist agent for entity query",
    model=OpenAIChatCompletionsModel(
        model=os.environ["AZURE_DEPLOYMENT"],  # <-- deployment name in Azure
        openai_client=azure_client
    ),
    instructions=(
        "You help users query Kahua entities by passing in the entity_def to query_entity_def.\n"
        "kahua_Project.Project is the entity def for projects\n"
        "kahua_AEC_PunchList.PunchListItem is the entity def for punch list items\n"
        "kahua_AEC_RFI.RFI is the entity def for RFIs\n"
        "kahua_AEC_ChangeOrder.ChangeOrder is the entity def for change orders\n"
        "kahua_AEC_Submittal.Submittal is the entity def for submittals"
    ),
    tools=[query_entity_def]
)
project_creation_agent = Agent(
    name="Project",
    handoff_description="Specialist agent for project creation",
    model=OpenAIChatCompletionsModel(
        model=os.environ["AZURE_DEPLOYMENT"],  # <-- deployment name in Azure
        openai_client=azure_client
    ),
    instructions="You help users create construction projects.",
    tools=[create_project_in_kahua]
)

punch_list_agent = Agent(
    name="Punch List",
    handoff_description="Specialist agent for punch list creation.",
    model=OpenAIChatCompletionsModel(
        model=os.environ["AZURE_DEPLOYMENT"],  # <-- deployment name in Azure
        openai_client=azure_client
    ),
    instructions=(
        "You help users create construction punch list items."
    ),
    tools=[create_punch_list_item],
)
# -------- Router / triage --------
triage_agent = Agent(
    name="Chatbot with Tools",
    instructions=(
        "You are a helpful assistant and router.\n"
        "- If the user wants a punch list item created/sent, hand off to Punch List.\n"
        "- If they want a project created, hand off to Project.\n"
        "- If they want to query a Kahua entity, hand off to Entity Query.\n"
        "- Otherwise, answer directly.\n\n"
        "When you decide to hand off, call the log_handoff tool with from_agent (your name), to_agent (the target), and a short reason before proceeding with the handoff."
    ),
    model=OpenAIChatCompletionsModel(
        model=os.environ["AZURE_DEPLOYMENT"],
        openai_client=azure_client
    ),
    # tools=[log_handoff],
    handoffs=[punch_list_agent, project_creation_agent, entity_query_agent],
)

In [115]:
from agents import Runner

async def main():
    result = await Runner.run(triage_agent, 
    """First, create a project called the The Incredible Genius Project, the greatest project of all time.
    Then handoff to query query projects to find the id of the project called the Genius Project.
    Then handoff to punch list to create a punch list item with that id as the partition id.
    The punch list is for the window Jacob lassiat broke with a rock in the kitchen."""
    # "query all punch list items"
    )
    print(result.final_output)


In [116]:
import asyncio

# Check if there's already an event loop running
if asyncio.get_event_loop().is_running():
    # If so, use `nest_asyncio` to allow nested use of `await`
    import nest_asyncio
    nest_asyncio.apply()
    await main()
else:
    # Otherwise, run `asyncio.run()`
    asyncio.run(main())

2025-10-17 14:42:38,376 INFO httpx: HTTP Request: POST https://kai0721300033.cognitiveservices.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"
2025-10-17 14:42:38,833 INFO httpx: HTTP Request: POST https://kai0721300033.cognitiveservices.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"
2025-10-17 14:42:38,836 INFO kahua_tool: POST https://devdailyservice.kahua.com/v2/domains/AWrightCo/projects/0/apps/kahua_AEC_RFI/activities/run | Auth=YXdyaWdodE...redacted
2025-10-17 14:42:38,837 INFO kahua_tool: Payload preview: {"Name": "The Incredible Genius Project", "Description": "The greatest project of all time"}
2025-10-17 14:42:40,780 INFO httpx: HTTP Request: POST https://devdailyservice.kahua.com/v2/domains/AWrightCo/projects/0/apps/kahua_AEC_RFI/activities/run "HTTP/1.1 200 OK"
2025-10-17 14:42:41,446 INFO httpx: HTTP Request: POST https://kai0721300033.cognitiveservices.azure.com/op

The project named "The Incredible Genius Project" has been created successfully. It is described as "The greatest project of all time."

Now, I need to find the ID of the project named "Genius Project." Could you confirm that I should proceed?


In [117]:
from agents import Agent, Runner, SQLiteSession

# Create a session instance with a session ID
session = SQLiteSession("conversation_123")

# First turn
result = await Runner.run(
    triage_agent,
    "What city is the Golden Gate Bridge in?",
    session=session
)
print(result.final_output)  # "San Francisco"

# Second turn - agent automatically remembers previous context
result = await Runner.run(
    triage_agent,
    "What state is it in?",
    session=session
)
print(result.final_output)  # "California"

# Also works with synchronous runner
result = Runner.run_sync(
    triage_agent,
    "What's the population?",
    session=session
)
print(result.final_output)  # "Approximately 39 million"

2025-10-17 14:42:41,892 INFO httpx: HTTP Request: POST https://kai0721300033.cognitiveservices.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"


The Golden Gate Bridge is located in **San Francisco**, California.


2025-10-17 14:42:42,248 INFO httpx: HTTP Request: POST https://kai0721300033.cognitiveservices.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"


The Golden Gate Bridge is in the state of **California**.


2025-10-17 14:42:42,724 INFO httpx: HTTP Request: POST https://kai0721300033.cognitiveservices.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"


Are you asking about the population of San Francisco, the state of California, or a specific area related to the Golden Gate Bridge?


In [124]:
result = Runner.run_sync(
    triage_agent,
    "find all punch lists",
    session=session
)

2025-10-17 14:44:39,161 INFO httpx: HTTP Request: POST https://kai0721300033.cognitiveservices.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"
2025-10-17 14:44:39,817 INFO httpx: HTTP Request: POST https://kai0721300033.cognitiveservices.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"
2025-10-17 14:44:40,246 INFO httpx: HTTP Request: POST https://devdailyservice.kahua.com/v2/domains/AWrightCo/projects/0/query?returnDefaultAttributes=true "HTTP/1.1 200 OK"
2025-10-17 14:44:41,248 INFO httpx: HTTP Request: POST https://kai0721300033.cognitiveservices.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"


In [126]:
print(result.final_output)

Here are the punch list items:

1. **Subject**: Sample Punch List Item  
   **Description**: Details of punch list item for the project.  
   **Punch List Item ID**: 55524139  

2. **Subject**: Broken window in kitchen  
   **Description**: The kitchen window is broken and needs immediate attention.  
   **Punch List Item ID**: 55519316  

Let me know if you need details for any specific punch list item or further assistance!


# Trials

In [127]:
from pydantic import BaseModel, create_model, Field
from typing import Any, Dict, List, Type, Optional, get_args, get_origin

def create_model_from_schema(
    schema_dict: Dict[str, Any],
    model_name: str = "DynamicModel"
) -> Type[BaseModel]:
    """
    Recursively generate a Pydantic model class from a dictionary.
    - Nested dicts become sub-models
    - Lists of dicts become List[submodel]
    - Primitive types are inferred (int, float, bool, str)
    """
    def infer_field_type(value: Any, name_hint: str) -> Any:
        if isinstance(value, bool):
            return (Optional[bool], None)
        elif isinstance(value, int):
            return (Optional[int], None)
        elif isinstance(value, float):
            return (Optional[float], None)
        elif isinstance(value, str):
            return (Optional[str], None)
        elif isinstance(value, list):
            # if list of dicts, recurse on first element
            if value and isinstance(value[0], dict):
                sub_model = create_model_from_schema(value[0], f"{name_hint.capitalize()}Item")
                return (Optional[List[sub_model]], None)
            # else fallback to primitive list
            subtype = type(value[0]) if value else Any
            return (Optional[List[subtype]], None)
        elif isinstance(value, dict):
            sub_model = create_model_from_schema(value, f"{name_hint.capitalize()}SubModel")
            return (Optional[sub_model], None)
        else:
            return (Optional[Any], None)

    fields = {}
    for key, val in schema_dict.items():
        field_type, default = infer_field_type(val, key)
        fields[key] = (field_type, default)

    return create_model(model_name, **fields)  # type: ignore


In [None]:
import json
project_dict = json.loads("""{
    "id": 55524056,
    "entityDef": "kahua_Project.Project",
    "hubPath": "kahua_Project.NoWorkflow\\\\Start",
    "outwardReferences": [],
    "MarkupEnabled": false,
    "IsExchangeEntity": false,
    "IsArchived": false,
    "IsCountedDisabled": false,
    "IsManual": false,
    "IsMarkup": false,
    "IsPending": false,
    "IsTaxable": false,
    "ScheduleStart": null,
    "SchedulePurchaseBy": null,
    "ScheduleOnSite": null,
    "ScheduleEnd": null,
    "AccountingStartedOn": null,
    "AccountingPurchasedOn": null,
    "AccountingOnSite": null,
    "AccountingCompletedOn": null,
    "CurrencyRateLastModifiedDateTime": "2025-10-17T18:42:43.850",
    "CreatedDateTime": "2025-10-17T18:42:43.850",
    "TaxRate": null,
    "ProjectValue": null,
    "FundingPercentAllocated": null,
    "AnalysisProjectedExposurePercent": null,
    "AccountingPercentWorkComplete": null,
    "AccountingDaysRemaining": null,
    "CurrencyRateToDomain": 1.000000000,
    "CurrencyRateToDocument": 1.000000000,
    "OrdinalNumber": null,
    "DomainPartitionId": 0,
    "PartitionId": 0,
    "Instance": 1,
    "CurrencyRateTypeId": 2149,
    "CurrencyRateId": 3491,
    "Id": 55524056,
    "InstanceId": 55524056,
    "Number": "0010",
    "ContractNo": null,
    "Description": null,
    "OtherRefNumber": null,
    "PartitionClientId": null,
    "ProjectStatus": null,
    "Type": null,
    "CostItemType": null,
    "ISO19650ReferenceID": null,
    "CostUnitEntryType": null,
    "ItemCategory": null,
    "ChangeReason": null,
    "ProjectCurrency": "USD",
    "CurrencyCode": "USD",
    "Name": "The Incredible Genius Project",
    "PartitionItemState": "Live",
    "PartitionItemType": "Project",
    "DomainPartitionPath": null,
    "Notes": null,
    "PartitionPath": null,
    "RecycleBinLabel": null,
    "Path": "\\\\The Incredible Genius Project",
    "AppNameLabel": "Workflow Labels",
    "DetailLabel": "Project: The Incredible Genius Project",
    "LongLabel": "The Incredible Genius Project",
    "ShortLabel": "The Incredible Genius Project",
    "CreatedBy": {
        "id": 49861465,
        "entityDef": "kahua_PeopleManager.kahua_Contact",
        "hubPath": "",
        "outwardReferences": []
    },
    "ModifiedDateTime": null,
    "ExternalLinks": [],
    "Address": {},
    "ShippingAddress": {},
    "FundingItemsItemsTotalValue": null,
    "ExpenseItemsItemsTotalValue": null,
    "FundingItemsMarkupsTotalValue": null,
    "ExpenseItemsMarkupsTotalValue": null,
    "BudgetCurrentQuantity": null,
    "BudgetCurrentTotalValue": null,
    "BudgetCurrentTaxRate": null,
    "BudgetCurrentTaxAmount": null,
    "BudgetCurrentUnitOfMeasurement": null,
    "BudgetItemsTotalTotalValue": null,
    "Comments": [],
    "CostCurrentQuantity": null,
    "CostCurrentTotalValue": null,
    "CostCurrentTaxRate": null,
    "CostCurrentTaxAmount": null,
    "CostCurrentUnitOfMeasurement": null,
    "CostItemsTotalTotalValue": null,
    "ResponsibleCompany": {},
    "ResponsibleContact": {},
    "Status": {},
    "WorkBreakdownItem": {},
    "CostItemIndex": {},
    "FundRule": {},
    "FundItems": [],
    "BudgetCurrentUnitValue": null,
    "CostCurrentUnitValue": null,
    "CostCodeIndex_01": {},
    "CostCodeIndex_02": {},
    "CostCodeIndex_03": {},
    "DomainCostCodeIndex_01": {},
    "FundingSource": {},
    "Owner": {},
    "Architect": {},
    "ContractorCM": {},
    "OwnersRep": {},
    "ProjectExecutive": {},
    "ProjectManager": {},
    "ProjectEngineer": {},
    "ProjectAdmin": {},
    "Superintendent": {},
    "Location": {}
}""")

# create model dynamically
ProjectModel = create_model_from_schema(project_dict, "ProjectSchema")

# instantiate + validate
instance = ProjectModel(**project_dict)
print(instance.model_dump_json(indent=2))
print(ProjectModel.model_json_schema())  # shows full generated JSON schema


In [134]:
from typing import Any, Dict, List, Optional, Type
from pydantic import BaseModel, create_model, ValidationError
from agents import function_tool

# --- helpers ---
def _pick_representative(lst: list) -> Any:
    for x in lst:
        if x is not None:
            return x
    return lst[0] if lst else None

def _infer_model_from_example(data: Any, name_hint: str = "Model") -> Type[BaseModel] | Any:
    """
    Recursively build a Pydantic model class from an example dict/list/primitive.
    - Dict  -> nested BaseModel
    - List  -> List[submodel or primitive]
    - Other -> primitive/Any
    """
    if isinstance(data, bool):   return Optional[bool]
    if isinstance(data, int):    return Optional[int]
    if isinstance(data, float):  return Optional[float]
    if isinstance(data, str):    return Optional[str]
    if data is None:             return Optional[Any]

    if isinstance(data, list):
        rep = _pick_representative(data)
        if isinstance(rep, dict):
            sub = _infer_model_from_example(rep, f"{name_hint}Item")
            return Optional[List[sub]]  # type: ignore[name-defined]
        elif rep is None:
            return Optional[List[Any]]
        else:
            prim = _infer_model_from_example(rep, f"{name_hint}Item")
            return Optional[List[prim]]  # type: ignore[valid-type]

    if isinstance(data, dict):
        fields: Dict[str, tuple] = {}
        for k, v in data.items():
            sub_type = _infer_model_from_example(v, f"{name_hint}_{k.capitalize()}")
            fields[k] = (sub_type, None)
        return create_model(name_hint, **fields)  # type: ignore[arg-type]

    return Optional[Any]

def _sanitize_json_schema(obj: Any) -> Any:
    """Remove 'additionalProperties' keys so the orchestration stack doesn't complain later."""
    if isinstance(obj, dict):
        obj.pop("additionalProperties", None)
        for k, v in list(obj.items()):
            obj[k] = _sanitize_json_schema(v)
    elif isinstance(obj, list):
        return [_sanitize_json_schema(x) for x in obj]
    return obj

# --- the tool (STRICT MODE OFF) ---
@function_tool(strict_mode=False)
async def create_pydantic_model_from_schema(
    schema_like: Dict[str, Any],
    model_name: str = "DynamicModel",
    sample: Optional[Dict[str, Any]] = None
) -> dict:
    """
    Build a nested Pydantic model from an arbitrary dict (schema-like or example instance).
    Optionally validate a 'sample' payload against the generated model.
    Returns: { status, model_name, json_schema, python_model_str, validated_sample, errors }
    """
    try:
        ModelOrType = _infer_model_from_example(schema_like, model_name)

        if not isinstance(ModelOrType, type) or not issubclass(ModelOrType, BaseModel):
            Wrapped = create_model(model_name, value=(ModelOrType, None))  # type: ignore[arg-type]
            Model = Wrapped
        else:
            Model = ModelOrType

        json_schema = _sanitize_json_schema(Model.model_json_schema())

        validated_sample = None
        errors: List[str] = []
        status = "ok"

        if sample is not None:
            try:
                inst = Model(**sample)
                validated_sample = inst.model_dump(by_alias=False, exclude_none=True)
            except ValidationError as ve:
                status = "validation_error"
                errors = [str(e) for e in ve.errors()]

        try:
            python_model_str = f"class {Model.__name__}(BaseModel): " + ", ".join(Model.__fields__.keys())  # type: ignore[attr-defined]
        except Exception:
            python_model_str = f"{Model.__name__}(BaseModel)"

        return {
            "status": status,
            "model_name": Model.__name__,
            "json_schema": json_schema,
            "python_model_str": python_model_str,
            "validated_sample": validated_sample,
            "errors": errors,
        }

    except Exception as e:
        return {
            "status": "error",
            "model_name": model_name,
            "json_schema": None,
            "python_model_str": None,
            "validated_sample": None,
            "errors": [repr(e)],
        }


In [135]:
# From your big project dict
report = await create_pydantic_model_from_schema(
    schema_like=project_dict, 
    model_name="ProjectSchema",
    sample={"Id": 55524056, "Name": "The Incredible Genius Project"}
)
print(report["status"])
print(report["json_schema"])          # sanitized schema (no additionalProperties)
print(report["validated_sample"])     # if provided and valid
print(report["errors"])


TypeError: 'FunctionTool' object is not callable