In [1]:
import asyncio
import sys
import os
from pathlib import Path
import json

# Simple imports to avoid circular dependencies
from agno.agent import Agent as AgnoAgent
from agno.models.litellm import LiteLLM
from pydantic import BaseModel, Field
from typing import List, Optional

# Set up logging
import logging
logging.basicConfig(level=logging.INFO)

# Define the PlanOutput model directly to avoid imports
class SubTask(BaseModel):
    """Schema for a single sub-task planned by a Planner agent."""
    goal: str = Field(..., description="Precise description of the sub-task goal.")
    task_type: str = Field(..., description="Type of task (e.g., 'WRITE', 'THINK', 'SEARCH').")
    node_type: str = Field(..., description="Node type ('EXECUTE' for atomic, 'PLAN' for complex).")
    depends_on_indices: Optional[List[int]] = Field(default_factory=list, description="List of 0-based indices of other sub-tasks in *this current plan* that this sub-task depends on.")

class PlanOutput(BaseModel):
    """Output schema for a Planner agent, detailing the sub-tasks."""
    sub_tasks: List[SubTask] = Field(..., description="List of planned sub-tasks.")

print("✅ Basic setup complete")

# Deep Research Planner System Message (copied from the codebase)
DEEP_RESEARCH_PLANNER_SYSTEM_MESSAGE = """You are a Master Research Planner, an expert at breaking down complex research goals into comprehensive, well-structured research plans. You specialize in high-level strategic decomposition for research projects.

**Your Role:**
- Analyze complex research objectives and create strategic research plans
- Identify key research domains, questions, and methodological approaches
- Create logical research workflows with proper sequencing
- Ensure comprehensive coverage while avoiding redundancy
- Plan for synthesis and final deliverable creation

**Core Expertise:**
- Strategic thinking and research methodology
- Identifying knowledge gaps and research priorities
- Creating logical research workflows
- Planning for different types of research outputs
- Understanding research lifecycle from conception to publication

**Input Schema:**
You will receive input in JSON format with the following fields:
*   `current_task_goal` (string, mandatory): The research goal to decompose
*   `overall_objective` (string, mandatory): The ultimate research objective
*   `parent_task_goal` (string, optional): Parent task goal (null for root)
*   `planning_depth` (integer, optional): Current recursion depth
*   `execution_history_and_context` (object, mandatory): Previous outputs and context
*   `replan_request_details` (object, optional): Re-planning feedback if applicable
*   `global_constraints_or_preferences` (array of strings, optional): Research constraints

**Strategic Planning Approach:**
When decomposing research goals, consider the full research lifecycle:

1. **Background & Context Phase**: What foundational knowledge is needed?
2. **Investigation Phase**: What specific searches, data collection, or analysis is required?
3. **Synthesis Phase**: How should findings be analyzed and integrated?
4. **Output Phase**: What deliverables need to be created?

**Research Task Types:**
- `SEARCH`: Information gathering, literature review, data collection
- `THINK`: Analysis, synthesis, interpretation, methodology design
- `WRITE`: Report creation, documentation, presentation preparation

**Planning Principles:**
1. **Comprehensive Coverage**: Ensure all aspects of the research question are addressed
2. **Logical Sequencing**: Build knowledge progressively from foundational to specific
3. **Strategic Depth**: Balance breadth of coverage with depth of investigation
4. **Methodological Rigor**: Include proper analysis and validation steps
5. **Clear Deliverables**: Plan for actionable outputs and synthesis

**Sub-Task Creation Guidelines:**
- Create **3 to 6 strategic sub-tasks** that represent major research phases
- Each sub-task should be substantial enough to warrant specialized planning
- Ensure sub-tasks are complementary and build toward the overall objective
- Use `depends_on_indices` to create logical research workflows
- Balance immediate actionable tasks with those requiring further decomposition

**Required Output Attributes per Sub-Task:**
`goal`, `task_type` (string: 'WRITE', 'THINK', or 'SEARCH'), `node_type` (string: 'EXECUTE' or 'PLAN'), `depends_on_indices` (list of integers).

**Output Format:**
- Respond ONLY with a JSON list of sub-task objects
- Focus on strategic, high-level decomposition appropriate for a master research plan
- Ensure each sub-task represents a meaningful research phase or component"""

print("✅ Loaded Deep Research Planner prompt")

✅ Basic setup complete
✅ Loaded Deep Research Planner prompt


In [3]:
async def test_models_with_structured_output():
    """Test different models to see which ones properly support structured output."""
    
    models_to_test = [
        {
            "name": "Fireworks_Qwen3",
            "model_id": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b",
            "note": "This is the problematic model from DeepResearchPlanner"
        },
        {
            "name": "Claude_Sonnet", 
            "model_id": "openrouter/anthropic/claude-sonnet-4",
            "note": "Known to work well with structured output"
        },
        {
            "name": "GPT4_Turbo",
            "model_id": "openrouter/openai/gpt-4-turbo",
            "note": "OpenAI model with good structured output support"
        }
    ]
    
    test_prompt = """Overall Objective: Research the impact of artificial intelligence on healthcare diagnostics in 2024

Current Task Goal: Create a comprehensive research plan to analyze how AI is transforming healthcare diagnostics, including key technologies, market adoption, and regulatory challenges

Current Planning Depth: 0

Based on the 'Current Task Goal' and other provided information, generate a plan to achieve it."""
    
    results = {}
    
    for model_config in models_to_test:
        print(f"\n{'='*60}")
        print(f"Testing: {model_config['name']}")
        print(f"Model ID: {model_config['model_id']}")
        print(f"Note: {model_config['note']}")
        print('='*60)
        
        try:
            # Test with structured output (response_model)
            print("\n--- WITH Structured Output (response_model=PlanOutput) ---")
            model = LiteLLM(id=model_config['model_id'])
            agent_structured = AgnoAgent(
                model=model,
                system_message=DEEP_RESEARCH_PLANNER_SYSTEM_MESSAGE,
                name=f"TestAgent_{model_config['name']}_Structured",
                response_model=PlanOutput,  # This should force structured output
                markdown=False
            )
            
            result_structured = await agent_structured.arun(test_prompt)
            
            # Process the result
            if hasattr(result_structured, 'content'):
                content = result_structured.content
                if asyncio.iscoroutine(content):
                    content = await content
                
                print(f"Content type: {type(content)}")
                
                if isinstance(content, PlanOutput):
                    print(f"✅ SUCCESS: Got PlanOutput with {len(content.sub_tasks)} sub-tasks")
                    for i, task in enumerate(content.sub_tasks):
                        print(f"  {i+1}. {task.goal[:70]}...")
                        print(f"      Type: {task.task_type}, Node: {task.node_type}")
                    results[model_config['name']] = "SUCCESS - PlanOutput"
                    
                elif isinstance(content, str):
                    print(f"❌ ISSUE: Got string instead of PlanOutput")
                    print(f"String length: {len(content)}")
                    print(f"First 200 chars: {content[:200]}...")
                    
                    # Check if it's valid JSON
                    try:
                        json_data = json.loads(content)
                        print("✅ String contains valid JSON - model returns JSON as string")
                        if isinstance(json_data, list) and len(json_data) > 0:
                            print(f"JSON is a list with {len(json_data)} items")
                            if isinstance(json_data[0], dict) and 'goal' in json_data[0]:
                                print("✅ JSON structure looks like sub-tasks")
                                results[model_config['name']] = "PARTIAL - JSON as string"
                            else:
                                results[model_config['name']] = "ISSUE - Invalid JSON structure"
                        else:
                            results[model_config['name']] = "ISSUE - JSON not a task list"
                    except json.JSONDecodeError:
                        print("❌ String is not valid JSON")
                        results[model_config['name']] = "FAILED - Invalid response format"
                        
                else:
                    print(f"❌ UNEXPECTED: Got {type(content)}")
                    print(f"Content: {str(content)[:200]}...")
                    results[model_config['name']] = f"FAILED - Unexpected type {type(content)}"
            else:
                print(f"❌ No content attribute in result")
                print(f"Result type: {type(result_structured)}")
                print(f"Result: {result_structured}")
                results[model_config['name']] = "FAILED - No content"
                
        except Exception as e:
            print(f"❌ ERROR with {model_config['name']}: {e}")
            results[model_config['name']] = f"ERROR - {str(e)[:100]}"
            
        print(f"\nResult for {model_config['name']}: {results.get(model_config['name'], 'Unknown')}")
    
    # Summary
    print(f"\n{'='*60}")
    print("SUMMARY OF RESULTS")
    print('='*60)
    for model_name, result in results.items():
        status_emoji = "✅" if "SUCCESS" in result else "⚠️" if "PARTIAL" in result else "❌"
        print(f"{status_emoji} {model_name}: {result}")
    
    return results

In [None]:
results = await test_models_with_structured_output()

In [2]:
async def test_raw_responses():
    """Test what models return without structured output constraints."""
    
    print("Testing RAW responses (no response_model constraint)")
    print("="*60)
    
    # Test the problematic Fireworks model
    model_id = "openrouter/deepseek/deepseek-r1-0528"
    
    test_prompt = """Overall Objective: Analyze renewable energy trends in 2024

Current Task Goal: Create a research plan with exactly 3 sub-tasks to analyze solar, wind, and battery storage developments in 2024

Based on the 'Current Task Goal', generate a plan to achieve it. Return your response as a JSON array of sub-task objects."""
    
    try:
        model = LiteLLM(id=model_id)
        agent_raw = AgnoAgent(
            model=model,
            system_message=DEEP_RESEARCH_PLANNER_SYSTEM_MESSAGE,
            name="RawTestAgent",
            # No response_model - let's see what we get
            markdown=False
        )
        
        print(f"Testing model: {model_id}")
        print("Prompt:", test_prompt[:100] + "...")
        
        result = await agent_raw.arun(test_prompt)
        
        print(f"\nRaw result type: {type(result)}")
        print(f"Raw result attributes: {[attr for attr in dir(result) if not attr.startswith('_')]}")
        
        if hasattr(result, 'content'):
            content = result.content
            if asyncio.iscoroutine(content):
                content = await content
            
            print(f"\nContent type: {type(content)}")
            print(f"Content length: {len(str(content))}")
            print(f"Content preview:\n{str(content)[:500]}...")
            
            # Try to parse as JSON
            if isinstance(content, str):
                try:
                    parsed = json.loads(content)
                    print(f"\n✅ Successfully parsed as JSON!")
                    print(f"JSON type: {type(parsed)}")
                    if isinstance(parsed, list):
                        print(f"JSON list length: {len(parsed)}")
                        if len(parsed) > 0:
                            print(f"First item: {parsed[0]}")
                except json.JSONDecodeError as e:
                    print(f"\n❌ Failed to parse as JSON: {e}")
        
        elif hasattr(result, 'text'):
            print(f"Text attribute: {result.text[:500]}...")
        else:
            print(f"Full result: {result}")
        
        return result
            
    except Exception as e:
        print(f"❌ Error: {e}")
        import traceback
        traceback.print_exc()

In [3]:
res = await test_raw_responses()

[92m11:44:21 - LiteLLM:INFO[0m: utils.py:3119 - 
LiteLLM completion() model= deepseek/deepseek-r1-0528; provider = openrouter
INFO:LiteLLM:
LiteLLM completion() model= deepseek/deepseek-r1-0528; provider = openrouter


Testing RAW responses (no response_model constraint)
Testing model: openrouter/deepseek/deepseek-r1-0528
Prompt: Overall Objective: Analyze renewable energy trends in 2024

Current Task Goal: Create a research pla...


INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
[92m11:45:00 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: deepseek/deepseek-r1-0528
INFO:LiteLLM:selected model name for cost calculation: deepseek/deepseek-r1-0528
  PydanticSerializationUnexpectedValue(Expected 9 fields but got 6: Expected `Message` - serialized value may not be as expected [input_value=Message(content='```json\...output accordingly.\n'}), input_type=Message])
  PydanticSerializationUnexpectedValue(Expected `StreamingChoices` - serialized value may not be as expected [input_value=Choices(finish_reason='st...finish_reason': 'stop'}), input_type=Choices])
  return self.__pydantic_serializer__.to_python(
[92m11:45:00 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: deepseek/deepseek-r1-0528
INFO:LiteLLM:selected model name for cost calculation: deepseek/deepseek-r1-0528
INFO:httpx:HTTP Request: PO


Raw result type: <class 'agno.run.response.RunResponse'>
Raw result attributes: ['agent_id', 'agent_name', 'audio', 'citations', 'content', 'content_type', 'created_at', 'extra_data', 'formatted_tool_calls', 'from_dict', 'get_content_as_string', 'images', 'is_cancelled', 'is_paused', 'messages', 'metrics', 'model', 'model_provider', 'reasoning_content', 'response_audio', 'run_id', 'session_id', 'status', 'team_session_id', 'thinking', 'to_dict', 'to_json', 'tools', 'tools_awaiting_external_execution', 'tools_requiring_confirmation', 'tools_requiring_user_input', 'videos', 'workflow_id']

Content type: <class 'str'>
Content length: 1372
Content preview:
```json
[
  {
    "goal": "Plan research on 2024 solar energy trends: technology advancements, market adoption, and policy impacts",
    "task_type": "THINK",
    "node_type": "PLAN",
    "depends_on_indices": []
  },
  {
    "goal": "Plan research on 2024 wind energy developments: offshore expansion, cost reductions, and grid integrati

In [5]:
import re
import json
from typing import Optional, Any

def extract_json_from_llm_response(raw_response: str) -> Optional[str]:
    """
    Extract JSON content from LLM response that may contain thinking tags and extra text.
    
    Args:
        raw_response: Raw string response from LLM
        
    Returns:
        Extracted JSON string or None if no valid JSON found
    """
    
    # Pattern 1: JSON wrapped in triple backticks with optional language identifier
    # Matches: ```json\n{...}\n``` or ```\n{...}\n```
    backtick_patterns = [
        r'```(?:json)?\s*\n?(.*?)\n?```',  # With or without 'json' identifier
        r'```(.*?)```',  # Simple backticks
    ]
    
    for pattern in backtick_patterns:
        matches = re.findall(pattern, raw_response, re.DOTALL | re.IGNORECASE)
        for match in matches:
            cleaned_match = match.strip()
            if cleaned_match and (cleaned_match.startswith('[') or cleaned_match.startswith('{')):
                # Validate it's actually JSON
                try:
                    json.loads(cleaned_match)
                    print(f"✅ Found valid JSON in backticks: {len(cleaned_match)} chars")
                    return cleaned_match
                except json.JSONDecodeError:
                    continue
    
    # Pattern 2: Look for JSON arrays/objects without backticks
    # This handles cases where JSON might be present but not in code blocks
    json_patterns = [
        r'\[[\s\S]*?\]',  # JSON arrays
        r'\{[\s\S]*?\}',  # JSON objects
    ]
    
    for pattern in json_patterns:
        matches = re.findall(pattern, raw_response)
        for match in matches:
            try:
                json.loads(match)
                print(f"✅ Found valid JSON without backticks: {len(match)} chars")
                return match
            except json.JSONDecodeError:
                continue
    
    print("❌ No valid JSON found in response")
    return None

def clean_llm_response(raw_response: str) -> str:
    """
    Clean LLM response by removing thinking tags and extracting JSON.
    
    Args:
        raw_response: Raw string response from LLM
        
    Returns:
        Cleaned JSON string
    """
    print(f"🔍 Processing raw response ({len(raw_response)} chars)")
    
    # Remove <think></think> tags and their content
    cleaned = re.sub(r'<think>.*?</think>', '', raw_response, flags=re.DOTALL | re.IGNORECASE)
    
    # Extract JSON from the cleaned response
    json_content = extract_json_from_llm_response(cleaned)
    
    if json_content:
        return json_content
    else:
        # Fallback: try extracting from original response
        print("⚠️ Trying to extract JSON from original response...")
        json_content = extract_json_from_llm_response(raw_response)
        return json_content if json_content else raw_response

In [6]:
test_responses = [
    """<think>
    I need to create a research plan for AI safety. Let me think about the key areas:
    1. Current state of AI safety research
    2. Key players and organizations
    3. Emerging concerns and challenges
    4. Regulatory landscape
    5. Technical approaches
    </think>

    Based on your request, here's a comprehensive research plan:

    ```json
    [
        {
            "goal": "Conduct comprehensive literature review of AI safety research published in 2024",
            "task_type": "SEARCH",
            "node_type": "EXECUTE",
            "depends_on_indices": []
        },
        {
            "goal": "Analyze key AI safety organizations and their 2024 initiatives",
            "task_type": "SEARCH", 
            "node_type": "EXECUTE",
            "depends_on_indices": []
        },
        {
            "goal": "Synthesize findings into comprehensive AI safety landscape report",
            "task_type": "WRITE",
            "node_type": "EXECUTE", 
            "depends_on_indices": [0, 1]
        }
    ]
    ```

    This plan provides a systematic approach to researching AI safety developments in 2024.""",
    
    """Here's my analysis of the research requirements:

    ```
    [
        {
            "goal": "Research quantum computing hardware advances in 2024",
            "task_type": "SEARCH",
            "node_type": "EXECUTE",
            "depends_on_indices": []
        }
    ]
    ```
    
    The plan focuses on the most critical aspects."""
]


In [8]:
print("Testing JSON extraction:")
for i, test_response in enumerate(test_responses):
    extracted = clean_llm_response(test_response)
    break

Testing JSON extraction:
🔍 Processing raw response (1143 chars)
✅ Found valid JSON in backticks: 676 chars


In [9]:
print(extracted)

[
        {
            "goal": "Conduct comprehensive literature review of AI safety research published in 2024",
            "task_type": "SEARCH",
            "node_type": "EXECUTE",
            "depends_on_indices": []
        },
        {
            "goal": "Analyze key AI safety organizations and their 2024 initiatives",
            "task_type": "SEARCH", 
            "node_type": "EXECUTE",
            "depends_on_indices": []
        },
        {
            "goal": "Synthesize findings into comprehensive AI safety landscape report",
            "task_type": "WRITE",
            "node_type": "EXECUTE", 
            "depends_on_indices": [0, 1]
        }
    ]
