# ResponsesAgent with OpenAI Responses API

This notebook demonstrates using OpenAI's Responses API with MLflow ResponsesAgent for the most seamless integration.

## Table of Contents
1. OpenAI Responses API Overview
2. Direct Responses API Integration
3. Streaming with Responses API
4. Tool Calling with Responses API
5. Complete Production Agent
6. Best Practices

## Setup

In [None]:
import os
from dotenv import load_dotenv
from typing import Generator

import mlflow
from mlflow.entities.span import SpanType
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)
from openai import OpenAI

# Load environment
load_dotenv()
assert "OPENAI_API_KEY" in os.environ, "Please set OPENAI_API_KEY in .env file"

# Set experiment
mlflow.set_experiment("ResponsesAPI_Integration")

print(f"MLflow version: {mlflow.__version__}")
print("âœ… Setup complete!")

## 1. OpenAI Responses API Overview

OpenAI's Responses API is the newer, more powerful API for conversational AI.

### Key Features:

| Feature | Description |
|---------|-------------|
| **Structured Output** | Rich output types (text, tool calls, reasoning) |
| **Native Tools** | Built-in function calling support |
| **Multi-modal** | Supports various input/output types |
| **Streaming** | Real-time streaming with structured events |
| **Annotations** | Link citations and references |

### Why Use Responses API with ResponsesAgent?

MLflow's ResponsesAgent is designed around the Responses API format, providing:

1. **Native compatibility**: No format conversion needed
2. **Direct passthrough**: Response can be returned as-is
3. **Full feature support**: All Responses API features work naturally
4. **Simpler code**: Less boilerplate for format handling

### API Comparison:

```python
# ChatCompletions API
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[...]
)

# Responses API
response = client.responses.create(
    model="gpt-4o",
    input=[...]
)
```

## 2. Direct Responses API Integration

The simplest integration - direct passthrough of Responses API results.

In [None]:
class SimpleResponsesAPIAgent(ResponsesAgent):
    """
    Simplest ResponsesAgent using OpenAI Responses API.
    
    Key insight: ResponsesAgent format matches OpenAI Responses API,
    so we can use direct passthrough with minimal conversion.
    """
    
    def __init__(self, model: str = "gpt-4o"):
        self.model = model
        self.client = OpenAI()
    
    @mlflow.trace(span_type=SpanType.AGENT)
    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """
        Non-streaming prediction using Responses API.
        
        The response from OpenAI Responses API can be converted
        directly to ResponsesAgentResponse using to_dict().
        """
        # Call Responses API directly
        response = self.client.responses.create(
            model=self.model,
            input=request.input,  # Direct passthrough!
        )
        
        # Convert to ResponsesAgentResponse
        return ResponsesAgentResponse(**response.to_dict())
    
    @mlflow.trace(span_type=SpanType.AGENT)
    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """
        Streaming prediction using Responses API.
        
        OpenAI Responses API streaming events can be converted
        directly to ResponsesAgentStreamEvent.
        """
        # Stream from Responses API
        for event in self.client.responses.create(
            model=self.model,
            input=request.input,
            stream=True,
        ):
            # Direct conversion to ResponsesAgentStreamEvent
            yield ResponsesAgentStreamEvent(**event.to_dict())


# Enable autologging
mlflow.openai.autolog()

# Test the agent
print("Testing Simple Responses API Agent...\n")
agent = SimpleResponsesAPIAgent(model="gpt-4o")

response = agent.predict({
    "input": [{"role": "user", "content": "What is MLflow in one sentence?"}]
})

print(f"Response type: {type(response)}")
print(f"Output: {response.output}")
print("\nâœ… Simple Responses API agent working!")

## 3. Streaming with Responses API

Streaming is particularly elegant with the Responses API.

In [None]:
class StreamingResponsesAgent(ResponsesAgent):
    """
    Streaming agent using OpenAI Responses API.
    
    Demonstrates:
    - Direct streaming passthrough
    - Event type handling
    - System prompt integration
    """
    
    def __init__(self, model: str = "gpt-4o", system_prompt: str = None):
        self.model = model
        self.client = OpenAI()
        self.system_prompt = system_prompt
    
    def _prepare_input(self, request: ResponsesAgentRequest) -> list:
        """Prepare input with optional system prompt."""
        input_messages = []
        
        # Add system prompt if provided
        if self.system_prompt:
            input_messages.append({
                "role": "system",
                "content": self.system_prompt
            })
        
        # Add user messages
        input_messages.extend([i.model_dump() for i in request.input])
        
        return input_messages
    
    @mlflow.trace(span_type=SpanType.AGENT)
    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming prediction."""
        input_messages = self._prepare_input(request)
        
        response = self.client.responses.create(
            model=self.model,
            input=input_messages,
        )
        
        return ResponsesAgentResponse(**response.to_dict())
    
    @mlflow.trace(span_type=SpanType.AGENT)
    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming prediction with direct passthrough."""
        input_messages = self._prepare_input(request)
        
        for event in self.client.responses.create(
            model=self.model,
            input=input_messages,
            stream=True,
        ):
            yield ResponsesAgentStreamEvent(**event.to_dict())


# Test streaming
print("Testing Streaming Responses API Agent...\n")
streaming_agent = StreamingResponsesAgent(
    model="gpt-4o",
    system_prompt="You are a concise assistant. Keep responses under 50 words."
)

print("Streaming response:")
print("-" * 50)

for event in streaming_agent.predict_stream({
    "input": [{"role": "user", "content": "Explain Python decorators."}]
}):
    # Handle different event types
    if event.type == "response.output_text.delta":
        if hasattr(event, 'delta') and event.delta:
            print(event.delta, end="", flush=True)
    elif event.type == "response.output_item.done":
        print("\n[Complete]")

print("-" * 50)
print("\nâœ… Streaming complete!")

## 4. Tool Calling with Responses API

The Responses API has native tool support that integrates seamlessly.

In [None]:
import json
from uuid import uuid4


class ResponsesAPIToolAgent(ResponsesAgent):
    """
    Tool-calling agent using OpenAI Responses API.
    
    Demonstrates:
    - Native Responses API tool support
    - Tool execution loop
    - Streaming tool calls
    """
    
    def __init__(self, model: str = "gpt-4o", tools: list = None):
        self.model = model
        self.client = OpenAI()
        self.tools = tools or []
        self._tool_functions = {}
    
    def register_tool(self, name: str, spec: dict, func):
        """Register a tool with its specification and implementation."""
        self.tools.append(spec)
        self._tool_functions[name] = func
    
    @mlflow.trace(span_type=SpanType.TOOL)
    def execute_tool(self, name: str, arguments: str) -> str:
        """Execute a tool and return its result."""
        args = json.loads(arguments)
        result = self._tool_functions[name](**args)
        return json.dumps(result) if isinstance(result, dict) else str(result)
    
    @mlflow.trace(span_type=SpanType.AGENT)
    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming prediction with tool calling."""
        outputs = [
            event.item
            for event in self.predict_stream(request)
            if event.type == "response.output_item.done"
        ]
        return ResponsesAgentResponse(
            output=outputs,
            custom_outputs=getattr(request, 'custom_inputs', None)
        )
    
    @mlflow.trace(span_type=SpanType.AGENT)
    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming with tool execution loop."""
        input_messages = [i.model_dump() for i in request.input]
        max_iterations = 10
        
        for _ in range(max_iterations):
            # Call Responses API with tools
            response = self.client.responses.create(
                model=self.model,
                input=input_messages,
                tools=self.tools if self.tools else None,
            )
            
            # Process each output item
            has_tool_call = False
            for output_item in response.output:
                item_dict = output_item.model_dump(exclude_none=True)
                
                # Yield the output item
                yield ResponsesAgentStreamEvent(
                    type="response.output_item.done",
                    item=item_dict,
                )
                
                # Check if it's a tool call
                if output_item.type == "function_call":
                    has_tool_call = True
                    
                    # Execute the tool
                    result = self.execute_tool(
                        output_item.name,
                        output_item.arguments
                    )
                    
                    # Create tool output
                    tool_output = {
                        "type": "function_call_output",
                        "call_id": output_item.call_id,
                        "output": result,
                    }
                    
                    # Yield tool output
                    yield ResponsesAgentStreamEvent(
                        type="response.output_item.done",
                        item=tool_output,
                    )
                    
                    # Add to conversation for next iteration
                    input_messages.append(item_dict)
                    input_messages.append(tool_output)
            
            # If no tool calls, we're done
            if not has_tool_call:
                break


# Create agent with tools
print("Creating Responses API Tool Agent...\n")
tool_agent = ResponsesAPIToolAgent(model="gpt-4o")

# Register tools
tool_agent.register_tool(
    name="get_weather",
    spec={
        "type": "function",
        "name": "get_weather",
        "description": "Get the current weather for a location.",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "City name"
                }
            },
            "required": ["location"]
        }
    },
    func=lambda location: {"temp": 72, "condition": "sunny", "location": location}
)

tool_agent.register_tool(
    name="calculate",
    spec={
        "type": "function",
        "name": "calculate",
        "description": "Perform a calculation.",
        "parameters": {
            "type": "object",
            "properties": {
                "expression": {
                    "type": "string",
                    "description": "Math expression to evaluate"
                }
            },
            "required": ["expression"]
        }
    },
    func=lambda expression: {"result": eval(expression)}
)

print(f"Registered {len(tool_agent.tools)} tools")
print("âœ… Tool agent ready!")

In [None]:
# Test tool calling
print("Testing tool calling...\n")
print("Query: What's the weather in Boston and what is 15 * 23?")
print("=" * 60)

for event in tool_agent.predict_stream({
    "input": [{"role": "user", "content": "What's the weather in Boston and what is 15 * 23?"}]
}):
    if event.type == "response.output_item.done":
        item = event.item
        item_type = item.get("type") if isinstance(item, dict) else getattr(item, "type", None)
        
        if item_type == "function_call":
            name = item.get("name") if isinstance(item, dict) else getattr(item, "name", "")
            args = item.get("arguments") if isinstance(item, dict) else getattr(item, "arguments", "")
            print(f"\nðŸ”§ TOOL CALL: {name}")
            print(f"   Args: {args}")
        elif item_type == "function_call_output":
            output = item.get("output") if isinstance(item, dict) else getattr(item, "output", "")
            print(f"ðŸ“¤ RESULT: {output}")
        elif item_type == "message":
            content = item.get("content") if isinstance(item, dict) else getattr(item, "content", [])
            if content and len(content) > 0:
                text = content[0].get("text") if isinstance(content[0], dict) else getattr(content[0], "text", "")
                print(f"\nðŸ’¬ RESPONSE: {text}")

print("\n" + "=" * 60)
print("âœ… Tool calling complete!")

## 5. Complete Production Agent

A fully-featured production-ready agent using the Responses API.

In [None]:
%%writefile responses_api_agent.py
"""Production Responses API agent with full features."""

import os
from typing import Generator

import mlflow
from mlflow.entities.span import SpanType
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)
from openai import OpenAI


class ProductionResponsesAgent(ResponsesAgent):
    """
    Production-ready agent using OpenAI Responses API.
    
    Features:
    - Direct Responses API integration
    - Streaming with passthrough
    - Configurable model and parameters
    - Full MLflow tracing
    - Custom outputs support
    """
    
    def __init__(
        self,
        model: str = "gpt-4o",
        system_prompt: str = None,
        temperature: float = 1.0,
        max_output_tokens: int = None,
    ):
        self.model = model
        self.client = OpenAI()
        self.system_prompt = system_prompt
        self.temperature = temperature
        self.max_output_tokens = max_output_tokens
    
    def _prepare_input(self, request: ResponsesAgentRequest) -> list:
        """Prepare input messages with optional system prompt."""
        messages = []
        
        if self.system_prompt:
            messages.append({
                "role": "system",
                "content": self.system_prompt
            })
        
        messages.extend([i.model_dump() for i in request.input])
        return messages
    
    def _build_api_kwargs(self, input_messages: list, stream: bool = False) -> dict:
        """Build API call kwargs."""
        kwargs = {
            "model": self.model,
            "input": input_messages,
            "temperature": self.temperature,
        }
        
        if stream:
            kwargs["stream"] = True
        
        if self.max_output_tokens:
            kwargs["max_output_tokens"] = self.max_output_tokens
        
        return kwargs
    
    @mlflow.trace(span_type=SpanType.AGENT)
    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming prediction."""
        input_messages = self._prepare_input(request)
        api_kwargs = self._build_api_kwargs(input_messages)
        
        response = self.client.responses.create(**api_kwargs)
        
        # Convert response and add custom outputs
        result = ResponsesAgentResponse(**response.to_dict())
        
        # Add custom outputs if present in request
        if hasattr(request, 'custom_inputs') and request.custom_inputs:
            result.custom_outputs = request.custom_inputs
        
        return result
    
    @mlflow.trace(span_type=SpanType.AGENT)
    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming prediction with direct passthrough."""
        input_messages = self._prepare_input(request)
        api_kwargs = self._build_api_kwargs(input_messages, stream=True)
        
        for event in self.client.responses.create(**api_kwargs):
            yield ResponsesAgentStreamEvent(**event.to_dict())


# Enable tracing
mlflow.openai.autolog()

# Create and set model
agent = ProductionResponsesAgent(
    model="gpt-4o",
    system_prompt="You are a helpful, accurate, and concise AI assistant.",
    temperature=0.7,
)
mlflow.models.set_model(agent)

In [None]:
# Log the agent
with mlflow.start_run(run_name="responses_api_agent") as run:
    model_info = mlflow.pyfunc.log_model(
        python_model="responses_api_agent.py",
        artifact_path="agent",
        pip_requirements=[
            "mlflow",
            "openai",
            "pydantic>=2.0.0",
        ],
    )
    
    print(f"âœ… Responses API agent logged!")
    print(f"Model URI: {model_info.model_uri}")

In [None]:
# Test the logged model
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)

response = loaded_model.predict({
    "input": [{"role": "user", "content": "What is the capital of France?"}]
})

print(f"Loaded model response:")
print(response)
print("\nâœ… Model loaded and tested!")

## 6. Best Practices

### When to Use Responses API vs ChatCompletions

| Use Case | Recommended API |
|----------|----------------|
| Simple chat | Either works |
| Tool calling | Responses API |
| Multi-agent | Responses API |
| Annotations | Responses API |
| Legacy integration | ChatCompletions |
| Library compatibility | ChatCompletions |

### Code Organization Tips

1. **Use direct passthrough** when possible - less code, fewer bugs
2. **Handle all event types** in streaming for robustness
3. **Add custom_outputs** for metadata tracking
4. **Enable autologging** for full observability

### Error Handling

In [None]:
import openai
import backoff


class RobustResponsesAgent(ResponsesAgent):
    """
    Agent with proper error handling and retries.
    """
    
    def __init__(self, model: str = "gpt-4o"):
        self.model = model
        self.client = OpenAI()
    
    @backoff.on_exception(
        backoff.expo,
        (openai.RateLimitError, openai.APITimeoutError),
        max_tries=3
    )
    def _call_api(self, input_messages: list):
        """Call API with retry logic."""
        return self.client.responses.create(
            model=self.model,
            input=input_messages,
        )
    
    @mlflow.trace(span_type=SpanType.AGENT)
    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        try:
            response = self._call_api([i.model_dump() for i in request.input])
            return ResponsesAgentResponse(**response.to_dict())
        
        except openai.AuthenticationError:
            # Return error response
            return ResponsesAgentResponse(
                output=[self.create_text_output_item(
                    text="Authentication failed. Please check your API key.",
                    id="error_1"
                )]
            )
        
        except openai.APIError as e:
            return ResponsesAgentResponse(
                output=[self.create_text_output_item(
                    text=f"API error occurred: {str(e)}",
                    id="error_1"
                )]
            )
    
    @mlflow.trace(span_type=SpanType.AGENT)
    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        try:
            for event in self.client.responses.create(
                model=self.model,
                input=[i.model_dump() for i in request.input],
                stream=True,
            ):
                yield ResponsesAgentStreamEvent(**event.to_dict())
        
        except Exception as e:
            # Yield error as output item
            yield ResponsesAgentStreamEvent(
                type="response.output_item.done",
                item=self.create_text_output_item(
                    text=f"Error during streaming: {str(e)}",
                    id="error_1"
                )
            )


print("âœ… Robust agent pattern demonstrated!")

## Summary

### What We Learned:

1. âœ… **Responses API Overview**: Newer, richer API from OpenAI
2. âœ… **Direct Integration**: Minimal code with passthrough
3. âœ… **Streaming**: Elegant streaming with direct conversion
4. âœ… **Tool Calling**: Native tool support
5. âœ… **Production Agent**: Complete deployable implementation
6. âœ… **Best Practices**: Error handling and when to use

### Key Advantages of Responses API:

| Advantage | Description |
|-----------|-------------|
| **Native Format** | ResponsesAgent uses same format |
| **Direct Passthrough** | `ResponsesAgentResponse(**response.to_dict())` |
| **Rich Output** | Multiple output types supported |
| **Tool Support** | Built-in function calling |

### Next Steps:
- Learn about deployment options
- Explore multi-agent patterns
- Build production applications