# OpenAI's Latest Tools: Building Intelligent Agents

### Installing or upgrading OpenAI

In [4]:
pip install --upgrade -q openai

Note: you may need to restart the kernel to use updated packages.


### Load Environment variables

In [7]:
import os
import openai
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv(), override=True)

os.environ.get('OPENAI_API_KEY')
print("API Key Loaded:", os.environ.get('OPENAI_API_KEY') is not None)

API Key Loaded: True


### ResponseAPI

In [10]:
from openai import OpenAI
client = OpenAI()
response = client.responses.create(
    model="gpt-4o",
    input="Write a one-sentence bedtime story about a unicorn."
)
print(response.output_text)

In a lush, moonlit forest, a gentle unicorn named Luna softly sang the stars to sleep with her shimmering lullaby.


### Analyzing Image Inputs

In [13]:
from openai import OpenAI
client = OpenAI()

response = client.responses.create(
    model="gpt-4o",
    input=[
        {"role": "user", "content": "what teams are playing in this image?"},
        {
            "role": "user",
            "content": [
                {
                    "type": "input_image",
                    "image_url": "https://upload.wikimedia.org/wikipedia/commons/3/3b/LeBron_James_Layup_%28Cleveland_vs_Brooklyn_2018%29.jpg"
                }
            ]
        }
    ]
)

print(response.output_text)

The teams playing in the image are the Cleveland Cavaliers and the Brooklyn Nets.


### Web Search Tool

In [16]:
from openai import OpenAI
client = OpenAI()

response = client.responses.create(
    model="gpt-4o",
    tools=[{"type": "web_search_preview"}],
    input="What was a positive news story from today?"
)

print(response.output_text)

As of March 16, 2025, here are some recent positive news stories:

**Gaza Ceasefire Agreement**

Israel and Hamas have agreed to a ceasefire and hostage release deal, aiming to halt a conflict that has resulted in significant loss of life. The ceasefire is set to last an initial six weeks, offering a glimmer of hope for peace in the region. ([positive.news](https://www.positive.news/society/good-news-stories-from-week-03-of-2025/?utm_source=openai))

**Indonesia's Free Meals Initiative**

Indonesia has launched an ambitious program to provide free meals to over 82 million people by 2029. The initiative focuses on combating malnutrition, with 190 kitchens already established to serve schoolchildren and pregnant women. ([positive.news](https://www.positive.news/society/good-news-stories-from-week-03-of-2025/?utm_source=openai))

**Revival of a 'Lost' Species in England**

A species previously thought to be lost has been thriving again in England, marking a significant success in conserva

### File Search Tool

In [None]:
# File Search demonstration (conceptual example)
from openai import OpenAI

client = OpenAI()

# Note: In a real implementation, you would first:
# 1. Upload files to OpenAI's platform
# 2. Create a vector store from those files
# But for this tutorial, we're just showing the API structure

response = client.responses.create(
    model="gpt-4o",
    tools=[{
        "type": "file_search",
        # In a real implementation, you would provide your vector_store_id here
        "vector_store_ids": ["example_store_id"]
    }],
    input="Find information about authentication methods in our API documentation"
)

print("Response would include information retrieved from your files")
print("This lets the AI search through your project documentation")

### Computer Use Capabilities

In [46]:
!playwright install

In [68]:
import base64
import asyncio
import nest_asyncio
from openai import OpenAI
from playwright.async_api import async_playwright

# Enable nested asyncio for Jupyter
nest_asyncio.apply()

# Initialize OpenAI client
client = OpenAI()

async def search_with_computer_use(query="camera under 500 USD"):
    """
    Use OpenAI's Computer Use capability to search on Bing
    """
    async with async_playwright() as p:
        # Launch browser with safety settings
        browser = await p.chromium.launch(
            headless=False,
            chromium_sandbox=True,
            env={},
            args=[
                "--disable-extensions",
                "--disable-file-system"
            ]
        )
        
        # Create a new page and set viewport
        page = await browser.new_page()
        await page.set_viewport_size({"width": 1024, "height": 768})
        
        try:
            # Initial navigation to Bing
            print("Navigating to Bing...")
            await page.goto("https://www.bing.com/")
            
            # Take screenshot of initial state
            screenshot = await page.screenshot()
            screenshot_base64 = base64.b64encode(screenshot).decode("utf-8")
            
            print(f"Initial screenshot taken. Sending to OpenAI with query: '{query}'")
            
            # Initial request to start the computer use session
            response = client.responses.create(
                model="computer-use-preview",
                tools=[{
                    "type": "computer_use_preview",
                    "display_width": 1024,
                    "display_height": 768,
                    "environment": "browser"
                }],
                input=[
                    {
                        "role": "user",
                        "content": f"Search for '{query}' on Bing and show me the results. Look for good options that match my search criteria."
                    },
                    {
                        "role": "user",
                        "content": [
                            {
                                "type": "input_image",
                                "image_url": f"data:image/png;base64,{screenshot_base64}"
                            }
                        ]
                    }
                ],
                truncation="auto"
            )
            
            print("Starting Computer Use loop...")
            action_count = 0
            
            # Computer use loop
            while True:
                action_count += 1
                print(f"\n--- Action {action_count} ---")
                
                # Find computer_call in response
                computer_calls = [item for item in response.output if getattr(item, 'type', None) == "computer_call"]
                
                # Check for reasoning/thought process
                reasoning_items = [item for item in response.output if getattr(item, 'type', None) == "reasoning"]
                for item in reasoning_items:
                    if hasattr(item, 'summary') and item.summary:
                        for summary_item in item.summary:
                            if hasattr(summary_item, 'text'):
                                print(f"AI reasoning: {summary_item.text}")
                
                if not computer_calls:
                    # No more computer actions requested
                    print("Computer use session complete.")
                    message_items = [item for item in response.output if getattr(item, 'type', None) == "message"]
                    for item in message_items:
                        if hasattr(item, 'content') and item.content:
                            for content_item in item.content:
                                if hasattr(content_item, 'text'):
                                    print(f"\nFinal result: {content_item.text}")
                    break
                
                # Get the computer call action
                computer_call = computer_calls[0]
                call_id = computer_call.call_id
                action = computer_call.action
                
                # Check if there are safety checks that need acknowledgment
                has_safety_checks = hasattr(computer_call, 'pending_safety_checks') and computer_call.pending_safety_checks
                acknowledged_checks = []
                
                if has_safety_checks:
                    print("Safety checks detected:", computer_call.pending_safety_checks)
                    # In a real application, you'd ask for user confirmation here
                    # For this example, we'll just acknowledge all checks
                    acknowledged_checks = computer_call.pending_safety_checks
                
                # Execute the action
                print(f"Executing action: {action.type}")
                
                if action.type == "click":
                    x, y = action.x, action.y
                    button = action.button if hasattr(action, 'button') else "left"
                    print(f"  Clicking at position ({x}, {y}) with {button} button")
                    await page.mouse.click(x, y, button=button)
                
                elif action.type == "type":
                    text = action.text
                    print(f"  Typing text: '{text}'")
                    await page.keyboard.type(text)
                
                elif action.type == "keypress":
                    keys = action.keys
                    print(f"  Pressing keys: {keys}")
                    for key in keys:
                        # Convert keys to proper Playwright format
                        if key == "ENTER":
                            playwright_key = "Enter"
                        elif key == "SPACE":
                            playwright_key = " "
                        elif key == "BACKSPACE":
                            playwright_key = "Backspace"
                        elif key == "TAB":
                            playwright_key = "Tab"
                        else:
                            playwright_key = key
                        await page.keyboard.press(playwright_key)
                
                elif action.type == "scroll":
                    x, y = action.x, action.y
                    scroll_x, scroll_y = action.scroll_x, action.scroll_y
                    print(f"  Scrolling at ({x}, {y}) with delta ({scroll_x}, {scroll_y})")
                    await page.mouse.move(x, y)
                    await page.evaluate(f"window.scrollBy({scroll_x}, {scroll_y})")
                
                elif action.type == "wait":
                    duration = getattr(action, 'duration', 2)
                    print(f"  Waiting for {duration} seconds...")
                    await asyncio.sleep(duration)
                
                else:
                    print(f"  Unsupported action: {action.type}")
                
                # Wait for any page changes to settle
                await asyncio.sleep(1)
                
                # Take screenshot after action
                screenshot = await page.screenshot()
                screenshot_base64 = base64.b64encode(screenshot).decode("utf-8")
                
                # Get current URL for safety checks
                current_url = page.url
                print(f"  Current URL: {current_url}")
                
                # Prepare the next request
                input_data = [{
                    "call_id": call_id,
                    "type": "computer_call_output",
                    "output": {
                        "type": "input_image",
                        "image_url": f"data:image/png;base64,{screenshot_base64}"
                    },
                    "current_url": current_url
                }]
                
                # Add safety check acknowledgments if needed
                if acknowledged_checks:
                    input_data[0]["acknowledged_safety_checks"] = acknowledged_checks
                
                print("  Sending updated screenshot to OpenAI...")
                
                # Send next request with the updated screenshot
                response = client.responses.create(
                    model="computer-use-preview",
                    previous_response_id=response.id,
                    tools=[{
                        "type": "computer_use_preview",
                        "display_width": 1024,
                        "display_height": 768,
                        "environment": "browser"
                    }],
                    input=input_data,
                    truncation="auto"
                )
                
        except Exception as e:
            print(f"Error during Computer Use session: {str(e)}")
        finally:
            # Ensure browser is closed
            print("Closing browser...")
            await browser.close()

# For Jupyter notebook usage
async def run_camera_search():
    await search_with_computer_use("camera under 500 USD")

# Usage in Jupyter: await run_camera_search()

In [74]:
await run_camera_search()

Navigating to Bing...
Initial screenshot taken. Sending to OpenAI with query: 'camera under 500 USD'
Starting Computer Use loop...

--- Action 1 ---
Executing action: click
  Clicking at position (365, 182) with left button
  Current URL: https://www.bing.com/
  Sending updated screenshot to OpenAI...

--- Action 2 ---
Executing action: type
  Typing text: 'camera under 500 USD'
  Current URL: https://www.bing.com/
  Sending updated screenshot to OpenAI...

--- Action 3 ---
Executing action: keypress
  Pressing keys: ['ENTER']
  Current URL: https://www.bing.com/search?q=camera+under+500+USD&form=QBLH&sp=-1&lq=0&pq=camera+under+500+usd&sc=0-20&qs=n&sk=&cvid=A899271443DF4EDF8121405421F92D9E&ghsh=0&ghacc=0&ghpl=
  Sending updated screenshot to OpenAI...

--- Action 4 ---
Executing action: click
  Clicking at position (390, 640) with left button
  Current URL: https://www.bing.com/search?q=camera+under+500+USD&form=QBLH&sp=-1&lq=0&pq=camera+under+500+usd&sc=0-20&qs=n&sk=&cvid=A899271443DF

### Streaming

In [77]:
from openai import OpenAI
client = OpenAI()

stream = client.responses.create(
    model="gpt-4o",
    input=[
        {
            "role": "user",
            "content": "Say 'double bubble bath' ten times fast.",
        },
    ],
    stream=True,
)

for event in stream:
    print(event)

ResponseCreatedEvent(response=Response(id='resp_67d68ba050848191bd248af6a036c7ee0ea58e98c2fb8ac0', created_at=1742113696.0, error=None, incomplete_details=None, instructions=None, metadata={}, model='gpt-4o-2024-08-06', object='response', output=[], parallel_tool_calls=True, temperature=1.0, tool_choice='auto', tools=[], top_p=1.0, max_output_tokens=None, previous_response_id=None, reasoning=Reasoning(effort=None, generate_summary=None), status='in_progress', text=ResponseTextConfig(format=ResponseFormatText(type='text')), truncation='disabled', usage=None, user=None, store=True), type='response.created')
ResponseInProgressEvent(response=Response(id='resp_67d68ba050848191bd248af6a036c7ee0ea58e98c2fb8ac0', created_at=1742113696.0, error=None, incomplete_details=None, instructions=None, metadata={}, model='gpt-4o-2024-08-06', object='response', output=[], parallel_tool_calls=True, temperature=1.0, tool_choice='auto', tools=[], top_p=1.0, max_output_tokens=None, previous_response_id=None,

### OpenAI Agent SDK

In [82]:
!pip install openai-agents -q
!pip install asyncio -q

### Code Review System - MultiAgent System

In [85]:
import asyncio
from agents import Agent, Runner, function_tool

# Define our tools
@function_tool
def check_syntax(code: str) -> str:
    """Check Python code for syntax errors"""
    try:
        compile(code, "<string>", "exec")
        return "Code compiles without syntax errors."
    except SyntaxError as e:
        return f"Syntax error found: {str(e)}"

@function_tool
def generate_html(content: str) -> str:
    """Generate HTML documentation with PDF export capability"""
    html = f"""<!DOCTYPE html>
<html>
<head>
    <title>Code Documentation</title>
    <style>
        body {{ font-family: Arial, sans-serif; line-height: 1.6; }}
        pre {{ background: #f4f4f4; padding: 10px; border-radius: 5px; overflow-x: auto; }}
        .container {{ max-width: 800px; margin: 0 auto; padding: 20px; }}
        h1, h2, h3 {{ color: #333; }}
        code {{ background: #f4f4f4; padding: 2px 5px; border-radius: 3px; }}
        button {{ 
            background: #4CAF50; 
            color: white; 
            padding: 10px 15px; 
            border: none; 
            border-radius: 4px; 
            cursor: pointer; 
            margin-top: 20px; 
        }}
        @media print {{ 
            button.no-print {{ display: none; }} 
        }}
    </style>
</head>
<body>
    <div class="container">
        <h1>Code Documentation</h1>
        {content}
        <button class="no-print" onclick="window.print()">Export as PDF</button>
    </div>
</body>
</html>"""
    return html

In [87]:
code_checker = Agent(
    name="Code Checker",
    instructions="""You are a Python code checker. Analyze syntax, logic, and potential bugs.
    Format your report in markdown.""",
    tools=[check_syntax],
)

code_reviewer = Agent(
    name="Code Reviewer",
    instructions="""You are a code reviewer. Evaluate code quality, readability, and suggest improvements.
    Format your review in markdown.""",
)

documentation_generator = Agent(
    name="Documentation Generator",
    instructions="""You are a documentation generator. Create documentation for the provided code.

IMPORTANT: After creating the markdown content, you MUST call the generate_html tool
and return ONLY the HTML result as your final response.

DO NOT say anything about generating HTML. JUST return the HTML from the generate_html tool call.

Your documentation should include:
- Overview of what the code does
- Function parameters and return values
- Exception handling
- Usage examples
- Recommendations for improvement""",
    tools=[generate_html],
)


In [89]:
# Sequential approach
async def run_code_checker(code):
    """Run the code checker agent on the provided code"""
    print("Step 1: Running code checker...")
    try:
        result = await Runner.run(code_checker, input=f"Please check this code and provide a detailed analysis:\n\n```python\n{code}\n```")
        print("Code checker analysis complete!")
        return result.final_output
    except Exception as e:
        print(f"Error in code checker: {str(e)}")
        raise

async def run_code_reviewer(code, checker_report):
    """Run the code reviewer agent on the provided code"""
    print("\nStep 2: Running code reviewer...")
    try:
        result = await Runner.run(code_reviewer, 
                               input=f"""Review this code for quality and suggest improvements.
                               
Previous analysis from the code checker:
{checker_report}

Code to review:
```python
{code}
```""")
        print("Code review complete!")
        return result.final_output
    except Exception as e:
        print(f"Error in code reviewer: {str(e)}")
        raise

async def run_documentation_generator(code, checker_report, reviewer_report):
    """Run the documentation generator agent on the provided code"""
    print("\nStep 3: Running documentation generator...")
    try:
        result = await Runner.run(documentation_generator, 
                               input=f"""Create documentation for this code. Use the generate_html tool for the final output.
                               
Code checker analysis:
{checker_report}

Code reviewer feedback:
{reviewer_report}

Code to document:
```python
{code}
```""")
        print("Documentation generation complete!")
        return result.final_output
    except Exception as e:
        print(f"Error in documentation generator: {str(e)}")
        raise

In [91]:
async def run_complete_review(code):
    """Run the complete review process with all three agents sequentially"""
    try:
        # Step 1: Check code
        checker_report = await run_code_checker(code)
        
        # Step 2: Review code
        reviewer_report = await run_code_reviewer(code, checker_report)
        
        # Step 3: Generate documentation
        documentation = await run_documentation_generator(code, checker_report, reviewer_report)
        
        return {
            "checker_report": checker_report,
            "reviewer_report": reviewer_report,
            "documentation": documentation
        }
    except Exception as e:
        print(f"Error in review process: {str(e)}")
        return {
            "error": str(e)
        }

### Testing the Multi-Agent System

In [94]:
# Sample code to review
sample_code = """
def calculate_average(numbers):
    \"\"\"
    Calculate the average of a list of numbers.
    
    Args:
        numbers (list): A list of numeric values
        
    Returns:
        float: The average of all values
        
    Raises:
        ZeroDivisionError: If the list is empty
    \"\"\"
    if not numbers:
        raise ZeroDivisionError("Cannot calculate average of empty list")
        
    total = sum(numbers)
    return total / len(numbers)

# Example usage
if __name__ == "__main__":
    data = [10, 15, 20, 25, 30]
    avg = calculate_average(data)
    print(f"The average is: {avg}")
"""

async def run_code_review_system():
    print("Starting code review system...")
    print("This will process code through three specialized agents in sequence:")
    print("1. Code Checker: Checks for syntax errors and bugs")
    print("2. Code Reviewer: Evaluates code quality and suggests improvements")
    print("3. Documentation Generator: Creates HTML documentation with PDF export")
    print("\nProcessing sample code...")
    
    results = await run_complete_review(sample_code)
    
    if "error" in results:
        print(f"\nError occurred: {results['error']}")
        return results
    
    print("\nReview process completed successfully!")
    
    print("\n--- CODE CHECKER REPORT ---")
    print(results["checker_report"])
    
    print("\n--- CODE REVIEWER REPORT ---")
    print(results["reviewer_report"])
    
    documentation = results["documentation"]
    if documentation.startswith("<!DOCTYPE html>") or "<html>" in documentation:
        print("\n--- DOCUMENTATION GENERATED ---")
        print("HTML documentation generated successfully!")
        
        html_file = "code_documentation.html"
        with open(html_file, "w") as f:
            f.write(documentation)
        
        print(f"\nHTML documentation saved to {html_file}")
        print("You can open this file in your browser to view the documentation")
        print("and use the 'Export as PDF' button")
    else:
        print("\n--- DOCUMENTATION TEXT ---")
        print("Did not receive HTML. Here's the documentation text:")
        print(documentation)
    
    return results
await run_code_review_system()

Starting code review system...
This will process code through three specialized agents in sequence:
1. Code Checker: Checks for syntax errors and bugs
2. Code Reviewer: Evaluates code quality and suggests improvements
3. Documentation Generator: Creates HTML documentation with PDF export

Processing sample code...
Step 1: Running code checker...
Code checker analysis complete!

Step 2: Running code reviewer...
Code review complete!

Step 3: Running documentation generator...
Documentation generation complete!

Review process completed successfully!

--- CODE CHECKER REPORT ---
## Code Analysis

### Syntax
The code contains no syntax errors and is written in valid Python.

### Functionality
The function `calculate_average` is designed to compute the average of a list of numbers. Here is a step-by-step analysis:

1. **Docstring**: The function is well-documented, including a clear explanation of its purpose, input arguments, return value, and potential exceptions. This improves code read

{'checker_report': '## Code Analysis\n\n### Syntax\nThe code contains no syntax errors and is written in valid Python.\n\n### Functionality\nThe function `calculate_average` is designed to compute the average of a list of numbers. Here is a step-by-step analysis:\n\n1. **Docstring**: The function is well-documented, including a clear explanation of its purpose, input arguments, return value, and potential exceptions. This improves code readability and maintainability.\n\n2. **Empty List Check**: \n   - The code correctly checks if the input list `numbers` is empty. If it is, a `ZeroDivisionError` is raised with an appropriate message. This prevents division by zero, which would occur when attempting to compute the average of an empty list.\n\n3. **Total and Average Calculation**:\n   - `total = sum(numbers)` efficiently computes the sum of the list.\n   - `return total / len(numbers)` computes and returns the average as a float.\n\n### Logic and Algorithm\n- The function accurately cal