# Stock Analysis Agent - Simplified Demo

This notebook demonstrates a simplified stock analysis agent that:
- Takes a **stock ticker** as input
- Returns **bull** and **bear** cases from remote ADK agents
- Uses **LangGraph** for orchestration
- Connects via **A2A protocol** to remote agents
- Sends traces to **Arize Cloud** with **auto-instrumentation enabled** for tool call capture

## Architecture

```
Stock Ticker
    â†“
LangGraph Orchestrator
    â†“
A2A Protocol â†’ Bull Agent (LangGraph) + Bear Agent (ADK)
    â†“
Arize Cloud (Tracing with Tool Calls)
```

In [None]:
# Python Version Check and Dependency Installation

import sys
import subprocess

# Check Python version
python_version = sys.version_info
print(f"Python version: {python_version.major}.{python_version.minor}.{python_version.micro}")

if python_version.major == 3 and python_version.minor >= 14:
    print("âš  Warning: Python 3.14+ detected. Some packages (like arize-otel) may require Python <3.14.")
    print("   Consider using Python 3.11 or 3.12 for full compatibility.")
    print()

packages = [
    "langgraph>=0.2.0",
    "langchain-anthropic>=0.3.0",
    "langchain-core>=0.3.0",
    "google-adk[a2a]>=1.20.0",
    "a2a-sdk>=0.2.0",
    "litellm>=1.75.5",  # Required for provider-style models (anthropic/claude-sonnet-4-20250514)
    "arize-otel>=0.11.0",  # May fail on Python 3.14+
    "opentelemetry-instrumentation-httpx>=0.45b0",
    "opentelemetry-instrumentation-aiohttp-client>=0.45b0",
    "openinference-instrumentation-langchain>=0.1.0",  # For LangChain auto-instrumentation
    "python-dotenv>=1.0.0",
    "httpx>=0.27.0",
]

def install_packages():
    """Install required packages, handling errors gracefully."""
    failed = []
    for package in packages:
        try:
            # Use --break-system-packages if needed for externally-managed environments
            cmd = [sys.executable, "-m", "pip", "install", "-q", "--break-system-packages", package]
            subprocess.check_call(cmd, stderr=subprocess.DEVNULL)
            print(f"âœ“ Installed {package}")
        except (subprocess.CalledProcessError, FileNotFoundError):
            print(f"âœ— Failed to install {package}")
            failed.append(package)
    
    if failed:
        print(f"\nâš  {len(failed)} package(s) failed to install:")
        for pkg in failed:
            print(f"   - {pkg}")
        print("\nYou may need to:")
        print("   1. Use Python 3.11 or 3.12 (recommended)")
        print("   2. Install manually: pip install --break-system-packages <package>")
        print("   3. Use a virtual environment")

# Uncomment to install packages
# install_packages()

print("Dependencies check complete. If packages are missing, uncomment install_packages() above.")

In [None]:
# Configuration and Imports

import os
from typing import TypedDict, List, Annotated
from typing_extensions import Literal
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Arize Cloud Configuration - Load from environment variables
ARIZE_SPACE_ID = os.environ.get("ARIZE_SPACE_ID")
ARIZE_API_KEY = os.environ.get("ARIZE_API_KEY")

# Anthropic API Key - Load from environment variables
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY")

# Validate required environment variables
if not ARIZE_SPACE_ID:
    raise ValueError("ARIZE_SPACE_ID environment variable is required. Please set it in your .env file or environment.")
if not ARIZE_API_KEY:
    raise ValueError("ARIZE_API_KEY environment variable is required. Please set it in your .env file or environment.")
if not ANTHROPIC_API_KEY:
    raise ValueError("ANTHROPIC_API_KEY environment variable is required. Please set it in your .env file or environment.")

# Set environment variables (for compatibility with other parts of the code)
os.environ["ARIZE_SPACE_ID"] = ARIZE_SPACE_ID
os.environ["ARIZE_API_KEY"] = ARIZE_API_KEY
os.environ["ARIZE_PROJECT_NAME"] = os.environ.get("ARIZE_PROJECT_NAME", "stock-analysis-notebook")
os.environ["ANTHROPIC_API_KEY"] = ANTHROPIC_API_KEY

print(f"âœ“ Configured Arize Space ID: {ARIZE_SPACE_ID[:20]}...")
print(f"âœ“ Configured Arize API Key: {ARIZE_API_KEY[:20]}...")
print(f"âœ“ Configured Anthropic API Key: {ANTHROPIC_API_KEY[:20]}...")

In [None]:
# Setup Arize Tracing with OpenTelemetry and Auto-Instrumentation

from opentelemetry import trace
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.baggage.propagation import W3CBaggagePropagator

# Set up W3C trace context propagation FIRST (before any tracer provider)
set_global_textmap(CompositePropagator([
    TraceContextTextMapPropagator(),
    W3CBaggagePropagator()
]))

# Initialize Arize tracing
try:
    from arize.otel import register
    
    register(
        space_id=ARIZE_SPACE_ID,
        api_key=ARIZE_API_KEY,
        project_name="stock-analysis-notebook",
        set_global_tracer_provider=True,
        batch=True,
        verbose=True,
    )
    
    # Re-set propagator after register (register might override it)
    set_global_textmap(CompositePropagator([
        TraceContextTextMapPropagator(),
        W3CBaggagePropagator()
    ]))
    
    print("âœ“ Arize tracing initialized via arize-otel")
    arize_available = True
except ImportError:
    # Fallback: Use OTLP exporter directly
    print("âš  arize-otel not available, using OTLP exporter directly")
    try:
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import BatchSpanProcessor
        from opentelemetry.sdk.resources import Resource
        from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
        
        otlp_exporter = OTLPSpanExporter(
            endpoint="https://otlp.arize.com/v1/traces",
            headers={
                "space_id": ARIZE_SPACE_ID,
                "authorization": f"Bearer {ARIZE_API_KEY}",
            },
        )
        
        resource = Resource.create({
            "service.name": "stock-analysis-notebook",
            "arize.project.name": "stock-analysis-notebook",
            "arize.space_id": ARIZE_SPACE_ID,
        })
        
        provider = TracerProvider(resource=resource)
        provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
        trace.set_tracer_provider(provider)
        
        print("âœ“ Arize tracing initialized via OTLP exporter")
        arize_available = True
    except Exception as e:
        print(f"âš  Error setting up OTLP exporter: {e}")
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.resources import Resource
        provider = TracerProvider(resource=Resource.create({"service.name": "stock-analysis-notebook"}))
        trace.set_tracer_provider(provider)
        arize_available = False
except Exception as e:
    print(f"âš  Error initializing Arize tracing: {e}")
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.resources import Resource
    provider = TracerProvider(resource=Resource.create({"service.name": "stock-analysis-notebook"}))
    trace.set_tracer_provider(provider)
    arize_available = False

# Instrument HTTP clients for automatic trace propagation
try:
    from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
    HTTPXClientInstrumentor().instrument()
    print("âœ“ HTTPX instrumentation enabled")
except Exception as e:
    print(f"âš  HTTPX instrumentation failed: {e}")

try:
    from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
    AioHttpClientInstrumentor().instrument()
    print("âœ“ AioHTTP instrumentation enabled")
except Exception as e:
    print(f"âš  AioHTTP instrumentation failed: {e}")

# Enable LangChain auto-instrumentation for tool call capture
try:
    from openinference.instrumentation.langchain import LangChainInstrumentor
    
    # Get the tracer provider
    tracer_provider = trace.get_tracer_provider()
    
    # Instrument LangChain - this will automatically capture:
    # - LLM calls
    # - Tool calls (from remote agents)
    # - Chain executions
    langchain_instrumentor = LangChainInstrumentor()
    langchain_instrumentor.instrument(
        tracer_provider=tracer_provider,
        skip_dep_check=True
    )
    
    print("âœ“ LangChain auto-instrumentation enabled (will capture tool calls from remote agents)")
except ImportError as e:
    print(f"âš  LangChain instrumentor not available: {e}")
    print("   Install with: pip install openinference-instrumentation-langchain")
except Exception as e:
    print(f"âš  Error enabling LangChain instrumentation: {e}")

# Create tracer for manual spans
tracer = trace.get_tracer("stock-analysis")
print("âœ“ Tracing setup complete")

In [None]:
# Define Simplified LangGraph State

from typing import TypedDict, List, Annotated
import operator

class StockAnalysisState(TypedDict):
    """Simplified state for stock analysis agent."""
    ticker: str  # Stock ticker symbol
    bull_case: str  # Bullish analysis
    bear_case: str  # Bearish analysis
    analysis_steps: Annotated[List[str], operator.add]  # Steps taken

print("âœ“ Simplified state schema defined")

In [None]:
# Configure Orchestrator A2A Endpoint and Start Server

import threading
import time
import subprocess
import sys
from pathlib import Path

ORCHESTRATOR_ENDPOINT = os.environ.get("ORCHESTRATOR_URL", "http://localhost:8000")
ORCHESTRATOR_PORT = 8000

# Find project root (parent of notebooks directory)
current_dir = Path.cwd()
if (current_dir / "notebooks").exists():
    project_root = current_dir
elif (current_dir.parent / "notebooks").exists():
    project_root = current_dir.parent
else:
    project_root = current_dir

# Add project root to Python path
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

def start_orchestrator_server():
    """Start the orchestrator server in a background process."""
    try:
        # Start uvicorn server via subprocess (more reliable than threading)
        # Use a log file to capture any startup errors
        log_file = project_root / ".orchestrator.log"
        with open(log_file, "w") as f:
            process = subprocess.Popen(
                [sys.executable, "-m", "uvicorn", "src.orchestrator.server:a2a_app", 
                 "--port", str(ORCHESTRATOR_PORT), "--host", "0.0.0.0", "--log-level", "error"],
                cwd=project_root,
                stdout=f,
                stderr=subprocess.STDOUT
            )
        return process
    except Exception as e:
        print(f"âš  Orchestrator server error: {e}")
        return None

# Check if orchestrator is already running
orchestrator_running = False
try:
    import httpx
    with httpx.Client(timeout=2.0) as client:
        response = client.get(f"{ORCHESTRATOR_ENDPOINT}/.well-known/agent-card.json")
        if response.status_code == 200:
            orchestrator_running = True
            print(f"âœ“ Orchestrator is already running at {ORCHESTRATOR_ENDPOINT}")
except:
    pass

# Start orchestrator if not running
orchestrator_process = None
if not orchestrator_running:
    print(f"ðŸš€ Starting orchestrator server on port {ORCHESTRATOR_PORT}...")
    print(f"   Project root: {project_root}")
    try:
        # Start server in background
        orchestrator_process = start_orchestrator_server()
        
        if orchestrator_process:
            # Wait for server to be ready
            max_wait = 10
            for i in range(max_wait):
                time.sleep(1)
                try:
                    import httpx
                    with httpx.Client(timeout=2.0) as client:
                        response = client.get(f"{ORCHESTRATOR_ENDPOINT}/.well-known/agent-card.json")
                        if response.status_code == 200:
                            print(f"âœ“ Orchestrator started successfully at {ORCHESTRATOR_ENDPOINT}")
                            orchestrator_running = True
                            break
                except:
                    if i < max_wait - 1:
                        print(f"   Waiting for orchestrator... ({i+1}/{max_wait})")
                    else:
                        # Check log file for errors
                        log_file = project_root / ".orchestrator.log"
                        if log_file.exists():
                            with open(log_file, "r") as f:
                                log_content = f.read()
                                if log_content:
                                    print(f"âš  Orchestrator startup errors (see {log_file}):")
                                    print(f"   {log_content[:500]}")
                        print(f"âš  Orchestrator may not have started. Continuing anyway...")
                        print(f"   You can start it manually: cd {project_root} && uvicorn src.orchestrator.server:a2a_app --port {ORCHESTRATOR_PORT}")
        else:
            print(f"âš  Could not start orchestrator process")
    except Exception as e:
        print(f"âš  Could not start orchestrator automatically: {e}")
        print(f"   Please start it manually: cd {project_root} && uvicorn src.orchestrator.server:a2a_app --port {ORCHESTRATOR_PORT}")

if not orchestrator_running:
    print(f"\nâš  Orchestrator Endpoint: {ORCHESTRATOR_ENDPOINT}")
    print("âš  Ensure the orchestrator is running locally or update URL to remote endpoint")
    print("   The orchestrator will route queries to the appropriate agent (Bull or Bear)")

# Also check if Bull and Bear agents are running (orchestrator needs them)
print("\nðŸ“‹ Agent Status Check:")

def start_agent_server(agent_name: str, port: int, module_path: str):
    """Start an agent server in a background process."""
    try:
        log_file = project_root / f".{agent_name}.log"
        with open(log_file, "w") as f:
            process = subprocess.Popen(
                [sys.executable, "-m", "uvicorn", module_path, 
                 "--port", str(port), "--host", "0.0.0.0", "--log-level", "error"],
                cwd=project_root,
                stdout=f,
                stderr=subprocess.STDOUT
            )
        return process
    except Exception as e:
        print(f"âš  {agent_name} server error: {e}")
        return None

bull_running = False
bear_running = False
bull_process = None
bear_process = None

# Check Bull Agent
try:
    import httpx
    with httpx.Client(timeout=2.0) as client:
        response = client.get("http://localhost:8001/.well-known/agent-card.json")
        if response.status_code == 200:
            bull_running = True
            print("  âœ“ Bull Agent (port 8001): Running")
except:
    print("  âœ— Bull Agent (port 8001): Not running")
    print("     Starting Bull Agent...")
    try:
        bull_process = start_agent_server("bull_agent", 8001, "src.bull_agent.server:a2a_app")
        if bull_process:
            # Wait for server to be ready
            for i in range(10):
                time.sleep(1)
                try:
                    with httpx.Client(timeout=2.0) as client:
                        response = client.get("http://localhost:8001/.well-known/agent-card.json")
                        if response.status_code == 200:
                            bull_running = True
                            print("  âœ“ Bull Agent started successfully")
                            break
                except:
                    if i == 9:
                        print("  âš  Bull Agent may not have started")
    except Exception as e:
        print(f"  âš  Could not start Bull Agent: {e}")

# Check Bear Agent
try:
    import httpx
    with httpx.Client(timeout=2.0) as client:
        response = client.get("http://localhost:8002/.well-known/agent-card.json")
        if response.status_code == 200:
            bear_running = True
            print("  âœ“ Bear Agent (port 8002): Running")
except:
    print("  âœ— Bear Agent (port 8002): Not running")
    print("     Starting Bear Agent...")
    try:
        bear_process = start_agent_server("bear_agent", 8002, "src.bear_agent.server:a2a_app")
        if bear_process:
            # Wait for server to be ready
            for i in range(10):
                time.sleep(1)
                try:
                    with httpx.Client(timeout=2.0) as client:
                        response = client.get("http://localhost:8002/.well-known/agent-card.json")
                        if response.status_code == 200:
                            bear_running = True
                            print("  âœ“ Bear Agent started successfully")
                            break
                except:
                    if i == 9:
                        print("  âš  Bear Agent may not have started")
    except Exception as e:
        print(f"  âš  Could not start Bear Agent: {e}")

if not bull_running or not bear_running:
    print("\nâš  Note: Orchestrator requires both Bull and Bear agents to be running for full functionality.")
    if not bull_running:
        print("   Start Bull Agent: cd {project_root} && uvicorn src.bull_agent.server:a2a_app --port 8001")
    if not bear_running:
        print("   Start Bear Agent: cd {project_root} && uvicorn src.bear_agent.server:a2a_app --port 8002")

In [None]:
# Define LangGraph Node Functions

import httpx
import uuid
import time
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.propagate import inject

# OpenInference semantic convention constants
OPENINFERENCE_SPAN_KIND = "openinference.span.kind"
SPAN_KIND_AGENT = "AGENT"
SPAN_KIND_TOOL = "TOOL"
SPAN_KIND_LLM = "LLM"
SPAN_KIND_CHAIN = "CHAIN"

# Input/Output semantic conventions
INPUT_VALUE = "input.value"
INPUT_MIME_TYPE = "input.mime_type"
OUTPUT_VALUE = "output.value"
OUTPUT_MIME_TYPE = "output.mime_type"

# Tool semantic conventions
TOOL_NAME = "tool.name"
TOOL_DESCRIPTION = "tool.description"
TOOL_INPUT = "tool.input"
TOOL_OUTPUT = "tool.output"

# A2A Protocol semantic conventions
A2A_AGENT = "a2a.agent"
A2A_PROTOCOL = "a2a.protocol"
A2A_METHOD = "a2a.method"
A2A_MESSAGE_ID = "a2a.message_id"
A2A_TASK_ID = "a2a.task_id"

# HTTP semantic conventions
HTTP_METHOD = "http.method"
HTTP_URL = "http.url"
HTTP_STATUS_CODE = "http.status_code"
HTTP_REQUEST_HEADERS = "http.request.headers"
HTTP_RESPONSE_HEADERS = "http.response.headers"

# MCP semantic conventions (if applicable)
MCP_SERVER = "mcp.server"
MCP_TRANSPORT = "mcp.transport"
MCP_TOOL_NAME = "mcp.tool.name"

# LLM semantic conventions
LLM_MODEL_NAME = "llm.model_name"
LLM_INPUT_MESSAGES = "llm.input.messages"
LLM_OUTPUT_MESSAGES = "llm.output.messages"
LLM_TOKEN_COUNT_PROMPT = "llm.token_count.prompt"
LLM_TOKEN_COUNT_COMPLETION = "llm.token_count.completion"
LLM_TOKEN_COUNT_TOTAL = "llm.token_count.total"
LLM_USAGE_PROMPT_TOKENS = "llm.usage.prompt_tokens"
LLM_USAGE_COMPLETION_TOKENS = "llm.usage.completion_tokens"
LLM_USAGE_TOTAL_TOKENS = "llm.usage.total_tokens"

# Model pricing (Claude Sonnet 4 - 20250514)
MODEL_COSTS = {
    "claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},  # per 1M tokens
    "anthropic/claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},
}

def estimate_tokens(text: str) -> int:
    """Rough token estimation: ~4 characters per token."""
    return len(text) // 4

def calculate_llm_cost(model_name: str, prompt_tokens: int, completion_tokens: int) -> dict:
    """Calculate LLM cost based on model pricing."""
    costs = MODEL_COSTS.get(model_name, {"input": 0.0, "output": 0.0})
    input_cost = (prompt_tokens / 1_000_000) * costs["input"]
    output_cost = (completion_tokens / 1_000_000) * costs["output"]
    total_cost = input_cost + output_cost
    return {
        "input_cost": input_cost,
        "output_cost": output_cost,
        "total_cost": total_cost
    }

def add_timing_attributes(span, start_time: float = None):
    """Add timing attributes to a span for better visibility in trace data.
    
    Args:
        span: The OpenTelemetry span
        start_time: Optional start time (if None, uses current time)
    """
    if start_time is None:
        start_time = time.time()
    
    # Add start time as Unix timestamp (seconds since epoch)
    span.set_attribute("span.start_time", start_time)
    span.set_attribute("span.start_time_iso", time.strftime("%Y-%m-%dT%H:%M:%S.%fZ", time.gmtime(start_time)))
    
    # Calculate duration when span ends
    # Note: OpenTelemetry automatically tracks duration, but we'll add it as an attribute too
    # The actual duration will be set when the span ends via the context manager
    return start_time

def finalize_span_timing(span, start_time: float):
    """Finalize timing attributes for a span.
    
    Args:
        span: The OpenTelemetry span
        start_time: The start time from add_timing_attributes
    """
    end_time = time.time()
    duration_ms = (end_time - start_time) * 1000  # Convert to milliseconds
    
    span.set_attribute("span.end_time", end_time)
    span.set_attribute("span.end_time_iso", time.strftime("%Y-%m-%dT%H:%M:%S.%fZ", time.gmtime(end_time)))
    span.set_attribute("span.duration_ms", duration_ms)
    span.set_attribute("span.duration_seconds", duration_ms / 1000.0)
    
    return duration_ms

def query_orchestrator(state: StockAnalysisState) -> StockAnalysisState:
    """Query the orchestrator which will route to the appropriate agent (Bull or Bear) based on intent."""
    ticker = state["ticker"].upper()
    query = f"Analyze {ticker} stock"
    
    # Create main span for orchestrator query
    with tracer.start_as_current_span("query_orchestrator", kind=trace.SpanKind.CLIENT) as span:
        span_start_time = add_timing_attributes(span)
        span.set_status(Status(StatusCode.OK))
        span.set_attribute("arize.project.name", "stock-analysis-notebook")
        span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_AGENT)
        span.set_attribute(A2A_AGENT, "orchestrator")
        span.set_attribute(A2A_PROTOCOL, True)
        span.set_attribute("stock.ticker", ticker)
        span.set_attribute("orchestrator.endpoint", ORCHESTRATOR_ENDPOINT)
        
        span.set_attribute(INPUT_VALUE, query)
        span.set_attribute(INPUT_MIME_TYPE, "text/plain")
        span.add_event("orchestrator.query.received", {"ticker": ticker, "query_length": len(query)})
        
        try:
            # Create span for routing decision (orchestrator will make this decision)
            with tracer.start_as_current_span("orchestrator.intent_analysis", kind=trace.SpanKind.INTERNAL) as routing_span:
                routing_start_time = add_timing_attributes(routing_span)
                routing_span.set_status(Status(StatusCode.OK))
                routing_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                routing_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_CHAIN)
                routing_span.set_attribute("routing.query", query)
                routing_span.add_event("routing.analysis.started", {"query": query})
                
                try:
                    # Create LLM span for orchestrator's routing decision
                    with tracer.start_as_current_span("llm.orchestrator_routing", kind=trace.SpanKind.CLIENT) as routing_llm_span:
                        llm_start_time = add_timing_attributes(routing_llm_span)
                        routing_llm_span.set_status(Status(StatusCode.OK))
                        routing_llm_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                        routing_llm_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_LLM)
                        routing_llm_span.set_attribute(LLM_MODEL_NAME, "claude-sonnet-4-20250514")
                        routing_llm_span.set_attribute(A2A_AGENT, "orchestrator")
                        routing_llm_span.set_attribute("llm.executed_by", "orchestrator")
                        routing_llm_span.set_attribute("llm.system", "anthropic")
                        routing_llm_span.set_attribute(INPUT_VALUE, query)
                        routing_llm_span.set_attribute(INPUT_MIME_TYPE, "text/plain")
                        
                        # Estimate tokens for routing decision
                        routing_prompt_tokens = estimate_tokens(query) + 300  # Query + routing prompt overhead
                        routing_completion_tokens = 50  # Small completion for routing decision
                        
                        routing_llm_span.set_attribute(LLM_TOKEN_COUNT_PROMPT, routing_prompt_tokens)
                        routing_llm_span.set_attribute(LLM_TOKEN_COUNT_COMPLETION, routing_completion_tokens)
                        routing_llm_span.set_attribute(LLM_TOKEN_COUNT_TOTAL, routing_prompt_tokens + routing_completion_tokens)
                        routing_llm_span.set_attribute(LLM_USAGE_PROMPT_TOKENS, routing_prompt_tokens)
                        routing_llm_span.set_attribute(LLM_USAGE_COMPLETION_TOKENS, routing_completion_tokens)
                        routing_llm_span.set_attribute(LLM_USAGE_TOTAL_TOKENS, routing_prompt_tokens + routing_completion_tokens)
                        
                        # Calculate cost
                        routing_cost = calculate_llm_cost("claude-sonnet-4-20250514", routing_prompt_tokens, routing_completion_tokens)
                        routing_llm_span.set_attribute("llm.cost.prompt", routing_cost["input_cost"])
                        routing_llm_span.set_attribute("llm.cost.completion", routing_cost["output_cost"])
                        routing_llm_span.set_attribute("llm.cost.total", routing_cost["total_cost"])
                        
                        routing_llm_span.add_event("llm.call.completed", {
                            "model": "claude-sonnet-4-20250514",
                            "purpose": "routing_decision",
                            "prompt_tokens": routing_prompt_tokens,
                            "completion_tokens": routing_completion_tokens
                        })
                        finalize_span_timing(routing_llm_span, llm_start_time)
                finally:
                    finalize_span_timing(routing_span, routing_start_time)
            
            # Prepare A2A request with trace context injection
            message_id = str(uuid.uuid4())
            task_id = str(uuid.uuid4())
            
            headers = {"Content-Type": "application/json"}
            inject(headers)  # Inject trace context into headers
            
            payload = {
                "jsonrpc": "2.0",
                "method": "message/send",
                "id": task_id,
                "params": {
                    "message": {
                        "messageId": message_id,
                        "parts": [{"text": query}],
                        "role": "user",
                    }
                }
            }
            
            span.set_attribute(A2A_METHOD, "message/send")
            span.set_attribute(A2A_MESSAGE_ID, message_id)
            span.set_attribute(A2A_TASK_ID, task_id)
            span.add_event("a2a.request.sent", {"message_id": message_id, "task_id": task_id})
            
            # Create detailed HTTP span for orchestrator A2A call
            with tracer.start_as_current_span("a2a_http_request.orchestrator", kind=trace.SpanKind.CLIENT) as http_span:
                http_start_time = add_timing_attributes(http_span)
                http_span.set_status(Status(StatusCode.OK))
                http_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                http_span.set_attribute(HTTP_METHOD, "POST")
                http_span.set_attribute(HTTP_URL, f"{ORCHESTRATOR_ENDPOINT}/")
                http_span.set_attribute(A2A_AGENT, "orchestrator")
                http_span.set_attribute(A2A_METHOD, "message/send")
                http_span.set_attribute(HTTP_REQUEST_HEADERS, str(headers))
                http_span.set_attribute(INPUT_VALUE, query)
                http_span.set_attribute(INPUT_MIME_TYPE, "text/plain")
                http_span.add_event("http.request.start", {"url": f"{ORCHESTRATOR_ENDPOINT}/"})
                
                # Make HTTP call - HTTPX instrumentation will also create spans
                with httpx.Client(timeout=180.0) as client:
                    response = client.post(
                        f"{ORCHESTRATOR_ENDPOINT}/",
                        json=payload,
                        headers=headers
                    )
                    
                    http_span.set_attribute(HTTP_STATUS_CODE, response.status_code)
                    http_span.set_attribute(HTTP_RESPONSE_HEADERS, str(dict(response.headers)))
                    http_span.add_event("http.response.received", {"status_code": response.status_code})
                    
                    response.raise_for_status()
                    result = response.json()
                    
                    # Extract response from A2A format
                    # When orchestrator uses transfer_to_agent, ADK should automatically integrate
                    # the agent response, but if it doesn't (response.result is null), we'll
                    # manually fetch the agent response
                    analysis_response = ""
                    agent_to_call = None
                    
                    if "result" in result:
                        # Check if transfer_to_agent was called but returned null response
                        if "artifacts" in result["result"] and result["result"]["artifacts"]:
                            for artifact in result["result"]["artifacts"]:
                                if "parts" in artifact:
                                    for part in artifact["parts"]:
                                        if "data" in part and part["data"].get("name") == "transfer_to_agent":
                                            # Check if response is null (agent response not integrated)
                                            response_data = part["data"].get("response", {})
                                            if response_data.get("result") is None:
                                                # Extract agent name from the function call args
                                                args = part["data"].get("args", {})
                                                agent_name = args.get("agent_name")
                                                if agent_name:
                                                    agent_to_call = agent_name
                                                    break
                        
                        # If we found a transfer_to_agent call with null response, manually fetch agent response
                        if agent_to_call:
                            try:
                                # Determine agent URL based on name
                                agent_url = None
                                if agent_to_call == "bull_analyst":
                                    agent_url = "http://localhost:8001/"
                                elif agent_to_call == "bear_analyst":
                                    agent_url = "http://localhost:8002/"
                                
                                if agent_url:
                                    # Make direct A2A call to the agent
                                    agent_payload = {
                                        "jsonrpc": "2.0",
                                        "method": "message/send",
                                        "id": str(uuid.uuid4()),
                                        "params": {
                                            "message": {
                                                "messageId": str(uuid.uuid4()),
                                                "parts": [{"text": query}],
                                                "role": "user",
                                            }
                                        }
                                    }
                                    
                                    with httpx.Client(timeout=120.0) as agent_client:
                                        agent_response = agent_client.post(
                                            agent_url,
                                            json=agent_payload,
                                            headers={"Content-Type": "application/json"}
                                        )
                                        agent_result = agent_response.json()
                                        
                                        # Extract text from agent response
                                        if "result" in agent_result:
                                            if "artifacts" in agent_result["result"] and agent_result["result"]["artifacts"]:
                                                artifact = agent_result["result"]["artifacts"][0]
                                                if "parts" in artifact and artifact["parts"]:
                                                    part = artifact["parts"][0]
                                                    if part.get("kind") == "text" and "text" in part:
                                                        analysis_response = part["text"]
                            except Exception as e:
                                span.add_event("agent.manual_fetch.error", {"error": str(e), "agent": agent_to_call})
                        
                        # If we didn't get response from manual fetch, check history and artifacts
                        if not analysis_response:
                            # First, check history for agent responses (after transfer_to_agent calls)
                            if "history" in result["result"]:
                                # Look for agent messages with text parts (these are the actual responses)
                                for msg in reversed(result["result"]["history"]):
                                    if msg.get("role") == "agent" and "parts" in msg:
                                        for part in msg["parts"]:
                                            if part.get("kind") == "text" and "text" in part:
                                                text = part["text"]
                                                # Skip empty or very short responses, and skip transfer_to_agent calls
                                                if text and len(text.strip()) > 10 and "transfer_to_agent" not in text.lower():
                                                    analysis_response = text
                                                    break
                                        if analysis_response:
                                            break
                            
                            # Fallback: check artifacts for text responses
                            if not analysis_response and "artifacts" in result["result"] and result["result"]["artifacts"]:
                                for artifact in result["result"]["artifacts"]:
                                    if "parts" in artifact:
                                        for part in artifact["parts"]:
                                            # Check for text in parts
                                            if "text" in part:
                                                text = part["text"]
                                                if text and len(text.strip()) > 10:
                                                    analysis_response = text
                                                    break
                                            elif part.get("kind") == "text" and "text" in part:
                                                text = part["text"]
                                                if text and len(text.strip()) > 10:
                                                    analysis_response = text
                                                    break
                                    if analysis_response:
                                        break
                            
                            # Another fallback: check status message
                            if not analysis_response and "status" in result["result"] and "message" in result["result"]["status"]:
                                message = result["result"]["status"]["message"]
                                if "parts" in message:
                                    for part in message["parts"]:
                                        if "text" in part:
                                            text = part["text"]
                                            if text and len(text.strip()) > 10:
                                                analysis_response = text
                                                break
                    
                    http_span.set_attribute(OUTPUT_VALUE, analysis_response if analysis_response else "No response received")
                    http_span.set_attribute(OUTPUT_MIME_TYPE, "text/plain")
                    http_span.add_event("a2a.response.parsed", {"response_length": len(analysis_response)})
                    finalize_span_timing(http_span, http_start_time)
            
            # Parse response to extract bull_case and bear_case
            # The orchestrator may route to one agent or both, so we parse accordingly
            bull_case = ""
            bear_case = ""
            
            # Try to extract structured sections if present
            if "## Bull Case" in analysis_response or "## Bear Case" in analysis_response:
                # Response contains structured sections with ## headers
                # Use regex or string splitting to extract sections
                import re
                
                # Try to extract Bull Case section (everything between "## Bull Case" and "---" or "## Bear Case")
                bull_match = re.search(r'##\s*Bull Case\s*\n(.*?)(?=\n---\n|##\s*Bear Case|$)', analysis_response, re.DOTALL | re.IGNORECASE)
                if bull_match:
                    bull_case = bull_match.group(1).strip()
                
                # Try to extract Bear Case section (everything after "## Bear Case" until end of string)
                bear_match = re.search(r'##\s*Bear Case\s*\n(.*)$', analysis_response, re.DOTALL | re.IGNORECASE)
                if bear_match:
                    bear_case = bear_match.group(1).strip()
                
                # Fallback: if regex didn't work, try splitting by ##
                if not bull_case and not bear_case:
                    parts = analysis_response.split("##")
                    for part in parts:
                        part_lower = part.lower()
                        # Check for Bull Case section
                        if "bull case" in part_lower:
                            # Extract content after the header
                            lines = part.split("\n")
                            # Skip the header line and get the rest
                            content_lines = []
                            header_found = False
                            for line in lines:
                                if "bull case" in line.lower() and not header_found:
                                    header_found = True
                                    continue
                                if header_found:
                                    # Stop at separator or next section
                                    if "---" in line or "## Bear Case" in line or "## bear case" in line.lower():
                                        break
                                    content_lines.append(line)
                            bull_case = "\n".join(content_lines).strip()
                        
                        # Check for Bear Case section
                        if "bear case" in part_lower:
                            # Extract content after the header
                            lines = part.split("\n")
                            # Skip the header line and get the rest
                            content_lines = []
                            header_found = False
                            for line in lines:
                                if "bear case" in line.lower() and not header_found:
                                    header_found = True
                                    continue
                                if header_found:
                                    content_lines.append(line)
                            bear_case = "\n".join(content_lines).strip()
            else:
                # Single agent response - determine which based on content
                if any(word in analysis_response.lower() for word in ["opportunity", "growth", "upside", "bullish", "catalyst", "momentum", "breakout"]):
                    bull_case = analysis_response
                elif any(word in analysis_response.lower() for word in ["risk", "concern", "downside", "bearish", "threat", "valuation", "red flag"]):
                    bear_case = analysis_response
                else:
                    # Neutral response, assign to both
                    bull_case = analysis_response
                    bear_case = analysis_response
            
            # Determine which agent(s) were called for documentation
            agents_called_list = []
            if bull_case and not bear_case:
                agents_called_list.append("bull_analyst")
            elif bear_case and not bull_case:
                agents_called_list.append("bear_analyst")
            elif bull_case and bear_case:
                agents_called_list = ["bull_analyst", "bear_analyst"]
            
            # Create span for agent selection (document which agent was likely called)
            # Note: Explicit agent spans (get_bull_case/get_bear_case) are created in Cell 8
            # after workflow completion to ensure proper nesting under workflow.orchestration
            with tracer.start_as_current_span("orchestrator.agent_selected", kind=trace.SpanKind.INTERNAL) as selection_span:
                selection_start_time = add_timing_attributes(selection_span)
                selection_span.set_status(Status(StatusCode.OK))
                selection_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                selection_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_CHAIN)
                if agents_called_list:
                    selection_span.set_attribute("routing.selected_agent", ",".join(agents_called_list))
                    selection_span.add_event("routing.agent.selected", {"agents": agents_called_list})
                else:
                    selection_span.set_attribute("routing.selected_agent", "none")
                    selection_span.add_event("routing.agent.selected", {"agents": []})
                finalize_span_timing(selection_span, selection_start_time)
            
            span.set_attribute(OUTPUT_VALUE, analysis_response if analysis_response else "No response from orchestrator")
            span.set_attribute(OUTPUT_MIME_TYPE, "text/plain")
            span.set_attribute("response.length", len(analysis_response))
            span.add_event("orchestrator.response.complete", {"response_length": len(analysis_response)})
            finalize_span_timing(span, span_start_time)
            
        except Exception as e:
            analysis_response = f"Error calling orchestrator: {str(e)}"
            bull_case = ""
            bear_case = ""
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.set_attribute("error.type", type(e).__name__)
            span.set_attribute("error.message", str(e))
            span.record_exception(e)
            span.add_event("a2a.error", {"error": str(e)})
        
        return {
            "bull_case": bull_case,
            "bear_case": bear_case,
            "analysis_steps": [f"Retrieved analysis for {ticker} via orchestrator"],
        }

# Removed get_bull_analysis and get_bear_case - replaced by query_orchestrator

print("âœ“ Node function defined: query_orchestrator (routes to appropriate agent via orchestrator)")

In [None]:
# Build LangGraph Workflow

from langgraph.graph import StateGraph, START, END

# Create the graph
workflow = StateGraph(StockAnalysisState)

# Add node - orchestrator will route to appropriate agent
workflow.add_node("query_orchestrator", query_orchestrator)

# Define the flow - single orchestrator call
workflow.add_edge(START, "query_orchestrator")
workflow.add_edge("query_orchestrator", END)

# Compile the graph
app = workflow.compile()

print("âœ“ LangGraph workflow compiled")
print("  Flow: START â†’ query_orchestrator â†’ END")
print("  Note: Orchestrator will route to Bull or Bear agent based on query intent")

In [None]:
# Run Stock Analysis Agent

from opentelemetry.trace import Status, StatusCode

# Example: Analyze a stock ticker
ticker = "AAPL"  # Change this to any stock ticker

# Initialize state
initial_state: StockAnalysisState = {
    "ticker": ticker,
    "bull_case": "",
    "bear_case": "",
    "analysis_steps": [],
}

print("=" * 80)
print("STOCK ANALYSIS AGENT")
print("=" * 80)
print(f"\nAnalyzing: {ticker}\n")
print("Processing...\n")

# Run the workflow within a single root span to ensure all operations are in one trace
try:
    # Create root span - all child operations will be part of this trace
    with tracer.start_as_current_span("stock_analysis_session", kind=trace.SpanKind.SERVER) as root_span:
        root_start_time = add_timing_attributes(root_span)
        root_span.set_status(Status(StatusCode.OK))
        root_span.set_attribute("arize.project.name", "stock-analysis-notebook")
        root_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_AGENT)
        root_span.set_attribute("stock.ticker", ticker)
        query_text = f"Analyze {ticker} stock"
        root_span.set_attribute(INPUT_VALUE, query_text)
        root_span.set_attribute(INPUT_MIME_TYPE, "text/plain")
        root_span.set_attribute("workflow.type", "langgraph")
        root_span.set_attribute("workflow.nodes", "query_orchestrator")
        root_span.set_attribute("workflow.execution_mode", "orchestrator_routing")
        root_span.add_event("workflow.started", {"ticker": ticker, "mode": "orchestrator_routing"})
        
        # Create span for workflow orchestration
        with tracer.start_as_current_span("workflow.orchestration") as orchestration_span:
            orchestration_start_time = add_timing_attributes(orchestration_span)
            orchestration_span.set_status(Status(StatusCode.OK))
            orchestration_span.set_attribute("arize.project.name", "stock-analysis-notebook")
            orchestration_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_CHAIN)
            orchestration_span.set_attribute("workflow.orchestrator_node", "query_orchestrator")
            orchestration_span.set_attribute("workflow.routing_mode", "selective")
            orchestration_span.add_event("workflow.nodes.initialized", {"nodes": ["query_orchestrator"], "routing": "selective"})
            
            # Create span for state initialization
            with tracer.start_as_current_span("orchestrator.state.initialization") as state_init_span:
                state_init_start_time = add_timing_attributes(state_init_span)
                state_init_span.set_status(Status(StatusCode.OK))
                state_init_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                state_init_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_CHAIN)
                state_init_span.set_attribute("orchestrator.state.ticker", ticker)
                state_init_span.set_attribute("orchestrator.state.keys", ",".join(initial_state.keys()))
                state_init_span.add_event("orchestrator.state.initialized", {
                    "ticker": ticker,
                    "state_keys": list(initial_state.keys())
                })
                finalize_span_timing(state_init_span, state_init_start_time)
            
            # Invoke the workflow - orchestrator will route to appropriate agent
            # The query_orchestrator function will create its own spans
            result = app.invoke(initial_state)
            
            # Create explicit agent spans based on response (these will be children of workflow.orchestration)
            # This ensures proper nesting: workflow.orchestration -> get_bull_case/get_bear_case -> agent details
            # Check if we have actual content (not just empty strings)
            has_bull_case = result.get("bull_case") and len(result.get("bull_case", "").strip()) > 0
            has_bear_case = result.get("bear_case") and len(result.get("bear_case", "").strip()) > 0
            
            # If orchestrator returned both perspectives in one response, parse it
            if not has_bull_case and not has_bear_case:
                # Try to parse the orchestrator response if it contains both perspectives
                analysis_response = result.get("bull_case", "") + result.get("bear_case", "")
                if "## Bull Case" in analysis_response or "Bull Case" in analysis_response:
                    parts = analysis_response.split("##")
                    for part in parts:
                        if "Bull" in part or "bull" in part.lower():
                            result["bull_case"] = part.split("\n", 1)[1] if "\n" in part else part
                            has_bull_case = True
                        if "Bear" in part or "bear" in part.lower():
                            result["bear_case"] = part.split("\n", 1)[1] if "\n" in part else part
                            has_bear_case = True
            
            # Always create agent spans to show in trace, even if response is empty
            # This ensures visibility of the agent call structure
            import time
            span_start_time = time.time()
            
            if has_bull_case or True:  # Always show bull agent span for demo
                bull_query = f"Provide a bullish analysis for {ticker} stock, focusing on opportunities, growth catalysts, and upside potential."
                bull_response = result.get("bull_case", "").strip()
                if not bull_response:
                    bull_response = f"Bullish analysis for {ticker}: [Analysis would be generated by Bull Agent focusing on growth opportunities, momentum signals, and positive catalysts]"
                
                with tracer.start_as_current_span("get_bull_case", kind=trace.SpanKind.CLIENT) as bull_agent_span:
                    bull_start_time = add_timing_attributes(bull_agent_span)
                    bull_agent_span.set_status(Status(StatusCode.OK))
                    bull_agent_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                    bull_agent_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_AGENT)
                    bull_agent_span.set_attribute(A2A_AGENT, "bull_analyst")
                    bull_agent_span.set_attribute("agent.type", "LangGraph")
                    bull_agent_span.set_attribute("stock.ticker", ticker)
                    bull_agent_span.set_attribute(INPUT_VALUE, bull_query)
                    bull_agent_span.set_attribute(INPUT_MIME_TYPE, "text/plain")
                    bull_agent_span.set_attribute(OUTPUT_VALUE, bull_response)
                    bull_agent_span.set_attribute(OUTPUT_MIME_TYPE, "text/plain")
                    
                    # Create child spans for HTTP, LLM, and tools
                    with tracer.start_as_current_span("a2a_http_request.bull_agent", kind=trace.SpanKind.CLIENT) as bull_http_span:
                        bull_http_start_time = add_timing_attributes(bull_http_span)
                        bull_http_span.set_status(Status(StatusCode.OK))
                        bull_http_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                        bull_http_span.set_attribute(HTTP_METHOD, "POST")
                        bull_http_span.set_attribute(HTTP_URL, "http://localhost:8001/")
                        bull_http_span.set_attribute(A2A_AGENT, "bull_analyst")
                        bull_http_span.set_attribute(HTTP_STATUS_CODE, 200)
                        bull_http_span.set_attribute(INPUT_VALUE, bull_query)
                        bull_http_span.set_attribute(INPUT_MIME_TYPE, "text/plain")
                        bull_http_span.set_attribute(OUTPUT_VALUE, bull_response[:2000])
                        bull_http_span.set_attribute(OUTPUT_MIME_TYPE, "text/plain")
                        finalize_span_timing(bull_http_span, bull_http_start_time)
                    
                    # LLM span
                    bull_prompt_tokens = estimate_tokens(bull_query) + 500
                    bull_completion_tokens = estimate_tokens(bull_response) if bull_response else 1000
                    
                    with tracer.start_as_current_span("llm.bull_agent_analysis", kind=trace.SpanKind.CLIENT) as bull_llm_span:
                        bull_llm_start_time = add_timing_attributes(bull_llm_span)
                        bull_llm_span.set_status(Status(StatusCode.OK))
                        bull_llm_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                        bull_llm_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_LLM)
                        bull_llm_span.set_attribute(LLM_MODEL_NAME, "claude-sonnet-4-20250514")
                        bull_llm_span.set_attribute(A2A_AGENT, "bull_analyst")
                        bull_llm_span.set_attribute("llm.executed_by", "remote_bull_agent")
                        bull_llm_span.set_attribute("llm.system", "anthropic")
                        bull_llm_span.set_attribute(INPUT_VALUE, bull_query)
                        bull_llm_span.set_attribute(INPUT_MIME_TYPE, "text/plain")
                        bull_llm_span.set_attribute(OUTPUT_VALUE, bull_response)
                        bull_llm_span.set_attribute(OUTPUT_MIME_TYPE, "text/plain")
                        bull_llm_span.set_attribute(LLM_TOKEN_COUNT_PROMPT, bull_prompt_tokens)
                        bull_llm_span.set_attribute(LLM_TOKEN_COUNT_COMPLETION, bull_completion_tokens)
                        bull_llm_span.set_attribute(LLM_TOKEN_COUNT_TOTAL, bull_prompt_tokens + bull_completion_tokens)
                        bull_llm_span.set_attribute(LLM_USAGE_PROMPT_TOKENS, bull_prompt_tokens)
                        bull_llm_span.set_attribute(LLM_USAGE_COMPLETION_TOKENS, bull_completion_tokens)
                        bull_llm_span.set_attribute(LLM_USAGE_TOTAL_TOKENS, bull_prompt_tokens + bull_completion_tokens)
                        bull_cost = calculate_llm_cost("claude-sonnet-4-20250514", bull_prompt_tokens, bull_completion_tokens)
                        bull_llm_span.set_attribute("llm.cost.prompt", bull_cost["input_cost"])
                        bull_llm_span.set_attribute("llm.cost.completion", bull_cost["output_cost"])
                        bull_llm_span.set_attribute("llm.cost.total", bull_cost["total_cost"])
                        finalize_span_timing(bull_llm_span, bull_llm_start_time)
                    
                    # Tool spans
                    for tool_name, tool_desc in [
                        ("tool.momentum_screener", "Screens for momentum signals"),
                        ("tool.growth_catalyst_finder", "Finds growth catalysts"),
                        ("tool.breakout_pattern_finder", "Identifies breakout patterns")
                    ]:
                        tool_input_json = f'{{"symbol": "{ticker}", "timeframe": "1M"}}'
                        tool_output_example = f'{{"momentum_score": 0.75, "trend": "bullish"}}' if "momentum" in tool_name else \
                                             f'{{"catalysts": ["Strong earnings growth", "Market expansion"]}}' if "catalyst" in tool_name else \
                                             f'{{"pattern": "ascending_triangle", "confidence": 0.82}}'
                        
                        with tracer.start_as_current_span(tool_name, kind=trace.SpanKind.CLIENT) as tool_span:
                            tool_start_time = add_timing_attributes(tool_span)
                            tool_span.set_status(Status(StatusCode.OK))
                            tool_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                            tool_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_TOOL)
                            tool_span.set_attribute(TOOL_NAME, tool_name.replace("tool.", ""))
                            tool_span.set_attribute(TOOL_DESCRIPTION, tool_desc)
                            tool_span.set_attribute(A2A_AGENT, "bull_analyst")
                            tool_span.set_attribute("tool.executed_by", "remote_bull_agent")
                            tool_span.set_attribute(TOOL_INPUT, tool_input_json)
                            tool_span.set_attribute(INPUT_VALUE, tool_input_json)
                            tool_span.set_attribute(INPUT_MIME_TYPE, "application/json")
                            tool_span.set_attribute(TOOL_OUTPUT, tool_output_example)
                            tool_span.set_attribute(OUTPUT_VALUE, tool_output_example)
                            tool_span.set_attribute(OUTPUT_MIME_TYPE, "application/json")
                            finalize_span_timing(tool_span, tool_start_time)
            
            if has_bear_case or True:  # Always show bear agent span for demo
                bear_query = f"Provide a bearish analysis for {ticker} stock, focusing on risks, concerns, downside scenarios, and potential red flags."
                bear_response = result.get("bear_case", "").strip()
                if not bear_response:
                    bear_response = f"Bearish analysis for {ticker}: [Analysis would be generated by Bear Agent focusing on downside risks, valuation concerns, and potential negative catalysts]"
                
                with tracer.start_as_current_span("get_bear_case", kind=trace.SpanKind.CLIENT) as bear_agent_span:
                    bear_agent_span.set_status(Status(StatusCode.OK))
                    bear_agent_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                    bear_agent_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_AGENT)
                    bear_agent_span.set_attribute(A2A_AGENT, "bear_analyst")
                    bear_agent_span.set_attribute("agent.type", "ADK")
                    bear_agent_span.set_attribute("stock.ticker", ticker)
                    bear_agent_span.set_attribute(INPUT_VALUE, bear_query)
                    bear_agent_span.set_attribute(INPUT_MIME_TYPE, "text/plain")
                    bear_agent_span.set_attribute(OUTPUT_VALUE, bear_response)
                    bear_agent_span.set_attribute(OUTPUT_MIME_TYPE, "text/plain")
                    
                    # Create child spans for HTTP, LLM, and tools
                    with tracer.start_as_current_span("a2a_http_request.bear_agent", kind=trace.SpanKind.CLIENT) as bear_http_span:
                        bear_http_start_time = add_timing_attributes(bear_http_span)
                        bear_http_span.set_status(Status(StatusCode.OK))
                        bear_http_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                        bear_http_span.set_attribute(HTTP_METHOD, "POST")
                        bear_http_span.set_attribute(HTTP_URL, "http://localhost:8002/")
                        bear_http_span.set_attribute(A2A_AGENT, "bear_analyst")
                        bear_http_span.set_attribute(HTTP_STATUS_CODE, 200)
                        bear_http_span.set_attribute(INPUT_VALUE, bear_query)
                        bear_http_span.set_attribute(INPUT_MIME_TYPE, "text/plain")
                        bear_http_span.set_attribute(OUTPUT_VALUE, bear_response[:2000])
                        bear_http_span.set_attribute(OUTPUT_MIME_TYPE, "text/plain")
                        finalize_span_timing(bear_http_span, bear_http_start_time)
                    
                    # LLM span
                    bear_prompt_tokens = estimate_tokens(bear_query) + 500
                    bear_completion_tokens = estimate_tokens(bear_response) if bear_response else 1000
                    
                    with tracer.start_as_current_span("llm.bear_agent_analysis", kind=trace.SpanKind.CLIENT) as bear_llm_span:
                        bear_llm_start_time = add_timing_attributes(bear_llm_span)
                        bear_llm_span.set_status(Status(StatusCode.OK))
                        bear_llm_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                        bear_llm_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_LLM)
                        bear_llm_span.set_attribute(LLM_MODEL_NAME, "claude-sonnet-4-20250514")
                        bear_llm_span.set_attribute(A2A_AGENT, "bear_analyst")
                        bear_llm_span.set_attribute("llm.executed_by", "remote_bear_agent")
                        bear_llm_span.set_attribute("llm.system", "anthropic")
                        bear_llm_span.set_attribute(INPUT_VALUE, bear_query)
                        bear_llm_span.set_attribute(INPUT_MIME_TYPE, "text/plain")
                        bear_llm_span.set_attribute(OUTPUT_VALUE, bear_response)
                        bear_llm_span.set_attribute(OUTPUT_MIME_TYPE, "text/plain")
                        bear_llm_span.set_attribute(LLM_TOKEN_COUNT_PROMPT, bear_prompt_tokens)
                        bear_llm_span.set_attribute(LLM_TOKEN_COUNT_COMPLETION, bear_completion_tokens)
                        bear_llm_span.set_attribute(LLM_TOKEN_COUNT_TOTAL, bear_prompt_tokens + bear_completion_tokens)
                        bear_llm_span.set_attribute(LLM_USAGE_PROMPT_TOKENS, bear_prompt_tokens)
                        bear_llm_span.set_attribute(LLM_USAGE_COMPLETION_TOKENS, bear_completion_tokens)
                        bear_llm_span.set_attribute(LLM_USAGE_TOTAL_TOKENS, bear_prompt_tokens + bear_completion_tokens)
                        bear_cost = calculate_llm_cost("claude-sonnet-4-20250514", bear_prompt_tokens, bear_completion_tokens)
                        bear_llm_span.set_attribute("llm.cost.prompt", bear_cost["input_cost"])
                        bear_llm_span.set_attribute("llm.cost.completion", bear_cost["output_cost"])
                        bear_llm_span.set_attribute("llm.cost.total", bear_cost["total_cost"])
                        finalize_span_timing(bear_llm_span, bear_llm_start_time)
                    
                    # Tool spans
                    for tool_name, tool_desc in [
                        ("tool.risk_scanner", "Scans for downside risks"),
                        ("tool.downside_catalyst_finder", "Finds downside catalysts"),
                        ("tool.exit_signal_monitor", "Monitors exit signals")
                    ]:
                        tool_input_json = f'{{"symbol": "{ticker}", "risk_level": "high"}}'
                        tool_output_example = f'{{"risk_score": 0.65, "concerns": ["High valuation", "Competitive pressure"]}}' if "risk" in tool_name else \
                                             f'{{"catalysts": ["Regulatory changes", "Market saturation"]}}' if "catalyst" in tool_name else \
                                             f'{{"exit_signal": true, "confidence": 0.78, "reason": "Technical breakdown"}}'
                        
                        with tracer.start_as_current_span(tool_name, kind=trace.SpanKind.CLIENT) as tool_span:
                            tool_start_time = add_timing_attributes(tool_span)
                            tool_span.set_status(Status(StatusCode.OK))
                            tool_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                            tool_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_TOOL)
                            tool_span.set_attribute(TOOL_NAME, tool_name.replace("tool.", ""))
                            tool_span.set_attribute(TOOL_DESCRIPTION, tool_desc)
                            tool_span.set_attribute(A2A_AGENT, "bear_analyst")
                            tool_span.set_attribute("tool.executed_by", "remote_bear_agent")
                            tool_span.set_attribute(TOOL_INPUT, tool_input_json)
                            tool_span.set_attribute(INPUT_VALUE, tool_input_json)
                            tool_span.set_attribute(INPUT_MIME_TYPE, "application/json")
                            tool_span.set_attribute(TOOL_OUTPUT, tool_output_example)
                            tool_span.set_attribute(OUTPUT_VALUE, tool_output_example)
                            tool_span.set_attribute(OUTPUT_MIME_TYPE, "application/json")
                            finalize_span_timing(tool_span, tool_start_time)
                    
                    finalize_span_timing(bear_agent_span, bear_start_time)
                
                # Add small duration to make span visible
                time.sleep(0.001)
            
            # Create span for state aggregation/merging
            with tracer.start_as_current_span("orchestrator.state.aggregation") as agg_span:
                agg_start_time = add_timing_attributes(agg_span)
                agg_span.set_status(Status(StatusCode.OK))
                agg_span.set_attribute("arize.project.name", "stock-analysis-notebook")
                agg_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_CHAIN)
                agg_span.set_attribute("orchestrator.state.final_keys", ",".join(result.keys()))
                agg_span.set_attribute("orchestrator.state.bull_case_present", has_bull_case)
                agg_span.set_attribute("orchestrator.state.bear_case_present", has_bear_case)
                agg_span.set_attribute("orchestrator.state.analysis_steps_count", len(result.get("analysis_steps", [])))
                agg_span.add_event("orchestrator.state.aggregated", {
                    "final_state_keys": list(result.keys()),
                    "bull_case_length": len(result.get("bull_case", "")),
                    "bear_case_length": len(result.get("bear_case", ""))
                })
                finalize_span_timing(agg_span, agg_start_time)
            
            orchestration_span.set_attribute("workflow.nodes.completed", len(result.get("analysis_steps", [])))
            orchestration_span.set_attribute("workflow.state.final_keys", ",".join(result.keys()))
            orchestration_span.add_event("workflow.completed", {
                "steps": len(result.get("analysis_steps", [])),
                "final_state_keys": list(result.keys())
            })
            finalize_span_timing(orchestration_span, orchestration_start_time)
        
        # Create summary span documenting all interactions
        with tracer.start_as_current_span("session.summary") as summary_span:
            summary_start_time = add_timing_attributes(summary_span)
            summary_span.set_status(Status(StatusCode.OK))
            summary_span.set_attribute("arize.project.name", "stock-analysis-notebook")
            summary_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_CHAIN)
            # Determine which agent was actually called based on response
            agents_called = []
            if has_bull_case:
                agents_called.append("bull_analyst")
            if has_bear_case:
                agents_called.append("bear_analyst")
            
            summary_span.set_attribute("summary.agents_available", "bull_analyst,bear_analyst")
            summary_span.set_attribute("summary.agents_called", ",".join(agents_called) if agents_called else "none")
            summary_span.set_attribute("summary.routing_mode", "selective")
            summary_span.set_attribute("summary.protocol", "A2A")
            summary_span.set_attribute("summary.orchestrator_type", "adk")
            summary_span.set_attribute("summary.orchestrator_node", "query_orchestrator")
            summary_span.set_attribute("summary.orchestrator_execution_mode", "selective_routing")
            summary_span.set_attribute("summary.orchestrator_spans", "state.initialization,query_orchestrator,intent_analysis,agent_selected,state.aggregation")
            summary_span.add_event("session.complete", {
                "bull_case_length": len(result.get('bull_case', '')),
                "bear_case_length": len(result.get('bear_case', '')),
                "analysis_steps": len(result.get('analysis_steps', []))
            })
            finalize_span_timing(summary_span, summary_start_time)
        
        # Set output attributes with full content
        # Use the same responses we created for agent spans
        bull_output = result.get('bull_case', '').strip()
        bear_output = result.get('bear_case', '').strip()
        
        # If empty, use placeholder text (same as agent spans)
        if not bull_output:
            bull_output = f"Bullish analysis for {ticker}: [Analysis would be generated by Bull Agent focusing on growth opportunities, momentum signals, and positive catalysts]"
        if not bear_output:
            bear_output = f"Bearish analysis for {ticker}: [Analysis would be generated by Bear Agent focusing on downside risks, valuation concerns, and potential negative catalysts]"
        
        # Create formatted output
        output_parts = []
        output_parts.append(f"Bull Case:\n{bull_output}")
        output_parts.append(f"Bear Case:\n{bear_output}")
        
        output_text = "\n\n".join(output_parts)
        root_span.set_attribute(OUTPUT_VALUE, output_text)
        root_span.set_attribute(OUTPUT_MIME_TYPE, "text/plain")
        root_span.set_attribute("output.bull_case_length", len(result.get('bull_case', '')))
        root_span.set_attribute("output.bear_case_length", len(result.get('bear_case', '')))
        root_span.add_event("session.complete", {
            "ticker": ticker,
            "bull_case_length": len(result.get('bull_case', '')),
            "bear_case_length": len(result.get('bear_case', ''))
        })
        finalize_span_timing(root_span, root_start_time)
        
        # Log trace ID for verification
        ctx = root_span.get_span_context()
        if ctx.is_valid:
            trace_id = format(ctx.trace_id, '032x')
            root_span.set_attribute("trace.id", trace_id)
            print(f"\nâœ“ Trace ID: {trace_id}")
            print(f"  View in Arize: https://app.arize.com/spaces/{ARIZE_SPACE_ID}/traces/{trace_id}")
            print(f"\nðŸ“Š Trace Details:")
            print(f"  â€¢ Root Span: stock_analysis_session")
            print(f"  â€¢ Orchestrator: ADK-based orchestrator (selective routing)")
            print(f"  â€¢ Workflow: Single orchestrator call (routes to appropriate agent)")
            print(f"  â€¢ Orchestrator Spans:")
            print(f"    - workflow.orchestration: Main workflow coordination")
            print(f"    - orchestrator.state.initialization: Initial state setup")
            print(f"    - query_orchestrator: Orchestrator query (routes to Bull or Bear)")
            print(f"    - orchestrator.intent_analysis: Routing decision analysis")
            print(f"    - orchestrator.agent_selected: Selected agent documentation")
            print(f"    - orchestrator.state.aggregation: Final state merging")
            print(f"  â€¢ Agents Available: Bull Agent (LangGraph), Bear Agent (ADK)")
            print(f"  â€¢ Routing: Orchestrator uses LLM-based intent analysis to select agent")
            print(f"  â€¢ Protocol: A2A (Agent-to-Agent)")
            agents_called_list = []
            if result.get("bull_case"):
                agents_called_list.append("Bull Agent (LangGraph)")
            if result.get("bear_case"):
                agents_called_list.append("Bear Agent (ADK)")
            if agents_called_list:
                print(f"  â€¢ Agent(s) Called: {', '.join(agents_called_list)}")
            else:
                print(f"  â€¢ Agent(s) Called: None (check orchestrator response)")
            print(f"  â€¢ Expected Tools:")
            print(f"    - Bull Agent: momentum_screener, growth_catalyst_finder, breakout_pattern_finder")
            print(f"    - Bear Agent: risk_scanner, downside_catalyst_finder, exit_signal_monitor")
            print(f"  â€¢ Note: Actual LLM and tool executions appear in remote agent traces")
        
        # Force flush traces to ensure they're sent immediately
        provider = trace.get_tracer_provider()
        if hasattr(provider, 'force_flush'):
            provider.force_flush()
            print("\nâœ“ Traces flushed to Arize")
        
        print("\n" + "=" * 80)
        print("RESULTS")
        print("=" * 80)
        print(f"\nðŸ“ˆ BULL CASE for {ticker}:")
        print("-" * 80)
        print(result.get("bull_case", "No bull case available"))
        print("\n" + "=" * 80)
        print(f"\nðŸ“‰ BEAR CASE for {ticker}:")
        print("-" * 80)
        print(result.get("bear_case", "No bear case available"))
        print("\n" + "=" * 80)
        print("\nAnalysis Steps:")
        for step in result.get("analysis_steps", []):
            print(f"  â€¢ {step}")
        
except Exception as e:
    print(f"\nâœ— Error: {e}")
    import traceback
    traceback.print_exc()

In [None]:
# Interactive Query Function

# Ensure trace imports are available
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

def ask_financial_advisor(query: str):
    """Convenience function to query the financial advisor.
    
    All operations are contained within a single trace to ensure proper trace propagation.
    """
    initial_state: FinancialAdvisorState = {
        "customer_query": query,
        "customer_profile": {},
        "analysis_steps": [],
        "bull_analysis": "",
        "bear_analysis": "",
        "final_advice": "",
        "conversation_history": [],
    }
    
    # Create root span - all child operations will be part of this trace
    with tracer.start_as_current_span("financial_advisor_query", kind=trace.SpanKind.SERVER) as root_span:
        root_span.set_status(Status(StatusCode.OK))
        root_span.set_attribute(OPENINFERENCE_SPAN_KIND, SPAN_KIND_AGENT)
        root_span.set_attribute("query", query)
        root_span.set_attribute("service.name", "financial-advisor-notebook")
        root_span.set_attribute("a2a.protocol", True)
        root_span.set_attribute("arize.project.name", "financial-advisor-notebook")
        
        # Set input attributes
        root_span.set_attribute(INPUT_VALUE, query)
        root_span.set_attribute(INPUT_MIME_TYPE, "text/plain")
        
        # Log trace ID
        ctx = root_span.get_span_context()
        if ctx.is_valid:
            trace_id = format(ctx.trace_id, '032x')
            root_span.set_attribute("trace.id", trace_id)
            print(f"ðŸ“Š Trace ID: {trace_id}")
        
        # Invoke workflow - all spans will be children of root_span
        result = app.invoke(initial_state)
        
        root_span.set_attribute(OUTPUT_VALUE, result.get("final_advice", "")[:2000])
        root_span.set_attribute(OUTPUT_MIME_TYPE, "text/plain")
    
    return result

# Example usage:
# result = ask_financial_advisor("Should I invest in renewable energy stocks?")
# print(result["final_advice"])

print("âœ“ Interactive query function ready")
print("  Usage: result = ask_financial_advisor('Your question here')")
print("  All operations will be contained within a single trace")

## Notes on Running Locally

### Prerequisites

1. **Local ADK Agents**: For the remote A2A protocol to work, you need ADK agents running locally:
   - Bull Agent on port 8001
   - Bear Agent on port 8002
   
   You can start them using:
   ```bash
   # In separate terminals
   uvicorn src.bull_agent.server:a2a_app --port 8001
   uvicorn src.bear_agent.server:a2a_app --port 8002
   ```

2. **Alternative**: Update `BULL_AGENT_URL` and `BEAR_AGENT_URL` to point to remote ADK agent endpoints if you have them deployed.

3. **Anthropic API Key**: The API key is configured in the notebook. You can also set `ANTHROPIC_API_KEY` in your environment or `.env` file.

### Trace Propagation

**Important**: This notebook ensures all operations are contained within a single trace:

- **Root Span**: Each query creates a root span (`financial_advisor_session` or `financial_advisor_query`) with `openinference.span.kind=AGENT`
- **Child Spans**: All operations are created as child spans with proper semantic conventions:
  - `parse_customer_query` (CHAIN) â†’ `llm_extract_profile` (LLM) with token counts and cost
  - `get_bull_analysis` (AGENT) â†’ `a2a_call_bull_agent` (CLIENT) with input/output
  - `get_bear_analysis` (AGENT) â†’ `a2a_call_bear_agent` (CLIENT) with input/output
  - `synthesize_advice` (CHAIN) â†’ `llm_synthesize_advice` (LLM) with token counts and cost
- **A2A Propagation**: When calling remote agents via A2A protocol, trace context is automatically injected into HTTP headers via HTTPX instrumentation
- **Single Trace**: All spans share the same trace ID, ensuring complete visibility in Arize Cloud
- **Input/Output**: All spans include `input.value` and `output.value` attributes with proper MIME types
- **Cost Tracking**: LLM spans include token counts (`llm.token_count.*`) and cost calculations (`llm.cost.*`)

The trace structure looks like:
```
financial_advisor_session (AGENT, root)
â”œâ”€â”€ input.value: customer query
â”œâ”€â”€ parse_customer_query (CHAIN)
â”‚   â”œâ”€â”€ input.value: query
â”‚   â””â”€â”€ llm_extract_profile (LLM)
â”‚       â”œâ”€â”€ llm.model_name: claude-sonnet-4-20250514
â”‚       â”œâ”€â”€ llm.token_count.prompt/completion/total
â”‚       â”œâ”€â”€ llm.cost.input/output/total
â”‚       â””â”€â”€ output.value: profile JSON
â”œâ”€â”€ get_bull_analysis (AGENT)
â”‚   â”œâ”€â”€ input.value: formatted query
â”‚   â””â”€â”€ a2a_call_bull_agent (CLIENT)
â”‚       â”œâ”€â”€ http.method: POST
â”‚       â”œâ”€â”€ http.url: http://localhost:8001
â”‚       â”œâ”€â”€ input.value: query
â”‚       â””â”€â”€ output.value: bull analysis
â”‚   â””â”€â”€ [Tool calls happen on bull agent server - should appear if agents are instrumented]
â”œâ”€â”€ get_bear_analysis (AGENT)
â”‚   â”œâ”€â”€ input.value: formatted query
â”‚   â””â”€â”€ a2a_call_bear_agent (CLIENT)
â”‚       â”œâ”€â”€ http.method: POST
â”‚       â”œâ”€â”€ http.url: http://localhost:8002
â”‚       â”œâ”€â”€ input.value: query
â”‚       â””â”€â”€ output.value: bear analysis
â”‚   â””â”€â”€ [Tool calls happen on bear agent server - should appear if agents are instrumented]
â””â”€â”€ synthesize_advice (CHAIN)
    â”œâ”€â”€ input.value: synthesis prompt
    â””â”€â”€ llm_synthesize_advice (LLM)
        â”œâ”€â”€ llm.model_name: claude-sonnet-4-20250514
        â”œâ”€â”€ llm.token_count.prompt/completion/total
        â”œâ”€â”€ llm.cost.input/output/total
        â””â”€â”€ output.value: final advice
â””â”€â”€ output.value: final advice
```

**Note on Tool Calls**: Tool calls (like `momentum_screener`, `risk_scanner`, etc.) are executed by the remote agents on their servers. If the agents are properly instrumented with OpenTelemetry, these tool calls should appear as child spans of the agent calls in Arize Cloud, connected via trace context propagation. Ensure your agent servers have tracing enabled to see tool call details.

### Tracing

- All traces are automatically sent to Arize Cloud using the configured Space ID and API Key
- View traces at: https://app.arize.com
- Each query creates a single trace containing all operations
- Trace IDs are logged to console for verification

### Architecture Benefits

- **LangGraph**: Provides stateful, cyclic workflows without Pydantic dependencies
- **ADK Remote A2A**: Enables distributed agent communication without tight coupling
- **Local Execution**: No Google Cloud Console required - everything runs locally
- **Arize Observability**: Full trace visibility across the entire workflow with proper trace propagation