# Test OpenAI Agents SDK
- basic usage
- use langfuse for prompt repository, evaluation, observability, user feedback
- run batches of prompt async against lists and dataframes
- implement a workflow to write a daily AI newsletter

In [1]:
import os
import yaml
import dotenv
import logging
import json
import yaml
from datetime import datetime
import time
import random
import glob

from pathlib import Path

import asyncio
import nest_asyncio

import pydantic
from pydantic import BaseModel, Field, RootModel
from typing import Dict, TypedDict, Type, List, Optional, Any, Iterable
from dataclasses import dataclass, field
from enum import Enum

import numpy as np
import pandas as pd

import langfuse
from langfuse import get_client
from langfuse import Langfuse
from langfuse.openai import openai
# from langfuse.openai import AsyncOpenAI
import logfire

from openai import AsyncOpenAI

import agents
from agents.exceptions import InputGuardrailTripwireTriggered
from agents import (Agent, Runner, Tool, OpenAIResponsesModel, 
                    ModelSettings, FunctionTool, InputGuardrail, GuardrailFunctionOutput,
                    SQLiteSession, set_default_openai_api, set_default_openai_client
                   )


import tenacity
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

from IPython.display import HTML, Image, Markdown, display

from prompt_loader import PromptLoader
from log_handler import SQLiteLogHandler, setup_sqlite_logging, sanitize_error_for_logging
from config import LOGDB
from llm import LLMagent  # methods to apply prompts async to large batches
from fetch import Fetcher # fetch news urls



In [2]:
print(f"OpenAI:            {openai.__version__}")
print(f"OpenAI Agents SDK  {agents.__version__}")
print(f"Pydantic           {pydantic.__version__}")
print(f"LangFuse           {langfuse.version.__version__}")
print(f"Logfire            {logfire.__version__}")


OpenAI:            1.107.0
OpenAI Agents SDK  0.2.11
Pydantic           2.11.7
LangFuse           3.3.4
Logfire            4.7.0


# Basic usage
- Run a prompt using agents
- Sessions
- Route through Portkey for observability
- Save logs
- Link to openai for traces and evals


In [3]:
dotenv.load_dotenv()

# to run async in jupyter notebook
nest_asyncio.apply()

# verbose OpenAI console logging if something doesn't work
# logging.basicConfig(level=logging.DEBUG)
# openai_logger = logging.getLogger("openai")
# openai_logger.setLevel(logging.DEBUG)


In [4]:
# for portkey proxy, but we will use langfuse instead for observability, prompt repository, prompt evals

# load environment variables including OPENAI_API_KEY
# OPENAI_BASE_URL="http://localhost:8787/v1"
# OPENAI_DEFAULT_HEADERS='{"x-portkey-provider": "openai"}'
# launch proxy service https://portkey.ai/docs/product/enterprise-offering/components
# npx @portkey-ai/gateway
# could point to a database with a portkey_config.yaml
# logging:
#   sink: sql
#   database_url: postgres://user:password@localhost:5432/portkey
# npx @portkey-ai/gateway --portkey_config.yaml
# print("OPENAI_BASE_URL =", os.getenv("OPENAI_BASE_URL"))
# print("OPENAI_DEFAULT_HEADERS =", os.getenv("OPENAI_DEFAULT_HEADERS"))

# needed for portkey - responses API is persistent connection-oriented and seeems to not work
# set_default_openai_api("chat_completions")


In [5]:
# modules are written to create a default logger, or pass a logger

def setup_logging(session_id: str = "default", db_path: str = "agent_logs.db") -> logging.Logger:
    """Set up logging to console and SQLite database."""

    # Create logger
    logging.basicConfig(level=logging.INFO)

    logger = logging.getLogger(f"NewsletterAgent.{session_id}")
    logger.setLevel(logging.INFO)

    # Clear any existing handlers
    logger.handlers.clear()

    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_formatter = logging.Formatter(
        '%(asctime)s | %(name)s | %(levelname)s | %(message)s',
        datefmt='%H:%M:%S'
    )
    console_handler.setFormatter(console_formatter)

    # SQLite handler
    sqlite_handler = SQLiteLogHandler(db_path)
    sqlite_handler.setLevel(logging.INFO)
    sqlite_formatter = logging.Formatter('%(message)s')
    sqlite_handler.setFormatter(sqlite_formatter)

    # Add handlers to logger
    logger.addHandler(console_handler)
    logger.addHandler(sqlite_handler)

    # Prevent propagation to root logger
    logger.propagate = False

    return logger

logger = setup_logging("newsletter_agent", "test_logs.db")

# Log some test messages
logger.info("Test info message", extra={
    'step_name': 'test_step',
    'agent_session': 'demo_session'
})

logger.warning("Test warning message", extra={
    'step_name': 'test_step',
    'agent_session': 'demo_session'
})

logger.error("Test error message", extra={
    'step_name': 'error_step',
    'agent_session': 'demo_session'
})

sanitize_error_for_logging("log with some bad stuff for the filter: sk-proj-123456789012345678901234567890123456789012345678")

13:00:54 | NewsletterAgent.newsletter_agent | INFO | Test info message
13:00:54 | NewsletterAgent.newsletter_agent | ERROR | Test error message


'log with some bad stuff for the filter: [API_KEY_REDACTED]'

In [6]:
# for portkey
# client = AsyncOpenAI(
#     base_url=os.getenv("OPENAI_BASE_URL"),
#     api_key=os.getenv("OPENAI_API_KEY"),
#     default_headers=json.loads(os.getenv("OPENAI_DEFAULT_HEADERS")),
# )

# # set the client globally
# set_default_openai_client(client)


In [7]:
# Configure logfire instrumentation.
# OpenAI used lofgfire from Pydantic AI and we need this to handle some traces sent to OpenAI
logfire.configure(service_name="my_agent_service", send_to_logfire=False)
logfire.instrument_openai_agents()  


In [8]:
# initialize langfuse for observability
# git clone https://github.com/langfuse/langfuse.git
# cd langfuse
# go to localhost:3000
# set up an org, project, get API keys and put in .env

lf_client = get_client()
 
# Verify connection
if lf_client.auth_check():
    print("Langfuse client is authenticated and ready!")
else:
    print("Authentication failed. Please check your credentials and host.")# Get production prompts
prompt = lf_client.get_prompt("newsagent/headline_classifier")

# Get by label
# You can use as many labels as you'd like to identify different deployment targets
prompt = lf_client.get_prompt("newsagent/headline_classifier", label="production")
print(prompt.prompt, "\n")
prompt = lf_client.get_prompt("newsagent/headline_classifier", label="latest")
print(prompt.prompt, "\n")

# Get by version number, usually not recommended as it requires code changes to deploy new prompt versions
prompt = lf_client.get_prompt("newsagent/headline_classifier", version=1)
print(prompt.prompt, "\n")


Langfuse client is authenticated and ready!
[{'type': 'message', 'role': 'system', 'content': 'You are a content-classification assistant that labels news headlines as AI-related or not.\nReturn JSON that matches the provided schema\n\nA headline is AI-related if it mentions (explicitly or implicitly):\n- Core AI models: machine learning, neural / deep / transformer networks\n- AI Applications: computer vision, NLP, robotics, autonomous driving, generative media\n- AI hardware, GPU chip supply, AI data centers and infrastructure\n- Companies or labs known for AI: OpenAI, DeepMind, Anthropic, xAI, NVIDIA, etc.\n- AI models & products: GPT-5, Gemini, Claude, Midjourney, DeepSeek, etc.\n- New AI products and AI integration into existing products/services\n- AI policy / ethics / safety / regulation / analysis\n- Research results related to AI\n- AI industry figures (Sam Altman, Demis Hassabis, Dario Amodei, etc.)\n- AI market and business developments, funding rounds, partnerships centered

In [9]:
# Get current `production` version of a text prompt
system_prompt = lf_client.get_prompt("swallow/system")
 
# Insert variables into prompt template
# compiled_prompt = prompt.compile(criticlevel="expert", movie="Dune 2")

system_prompt.prompt


'You are an expert on airspeed velocities of swallows. Answer questions about swallow flight speeds with authority and humor when appropriate.'

In [10]:
# delete this db 
[os.remove(f) for f in glob.glob('swallow.db*') if os.path.exists(f)]


[None]

In [11]:
user_prompt = lf_client.get_prompt("swallow/user1").compile()
print(user_prompt)

openai_client = AsyncOpenAI()

# async def main():
agent = Agent(
    name="Assistant",
    instructions=system_prompt.prompt,
    model=OpenAIResponsesModel(model="gpt-4.1", openai_client=openai_client),
)

# 1) Create (or reuse) a session. Use a durable DB path if you want persistence.
session = SQLiteSession("test_swallow_chat", "swallow.db")

result = await Runner.run(agent, user_prompt, session=session)
display(Markdown(result.final_output))

# loop = asyncio.get_running_loop()
# await loop.create_task(main())


What is the airspeed velocity of an unladen swallow?

13:00:54.949 OpenAI Agents trace: Agent workflow
13:00:54.950   Agent run: 'Assistant'
13:00:54.951     Responses API with 'gpt-4.1'
13:00:54.951       OpenAI-generation


Ah, the classic question! As any aviary aficionado (or "Monty Python and the Holy Grail" fan) knows, "What is the airspeed velocity of an unladen swallow?" is not as simple as it sounds. 

**Assuming you‚Äôre referring to the European Swallow (Hirundo rustica):**  
The average cruising airspeed velocity is roughly **11 meters per second**, which is about **24 miles per hour (38 km/h)**.

But let‚Äôs not forget the all-important follow-up:  
*"African or European swallow?"*  
‚Äî because **African swallows** (actually a group of several species) may differ in size and flight style!

To sum up:  
- **European Swallow:** ~11 m/s (24 mph, 38 km/h)
- **African Swallow:** Data is less specific, but likely similar to slightly less, depending on the species.

And lest you attempt to carry a coconut by the husk‚Äîremember, ‚Äúit‚Äôs a simple matter of weight ratios!‚Äù ü••üê¶

In [12]:
# 3) Next turns ‚Äî just keep reusing the same session
result = await Runner.run(agent, "explain how that number was measured / computed", session=session)

display(Markdown(result.final_output))

13:01:01.308 OpenAI Agents trace: Agent workflow
13:01:01.308   Agent run: 'Assistant'
13:01:01.309     Responses API with 'gpt-4.1'
13:01:01.309       OpenAI-generation


Delighted to flap into the details! Measuring (or calculating) the airspeed velocity of an unladen swallow isn‚Äôt as simple as just clocking one with a speed gun on the motorway‚Äîbut ornithologists are a clever bunch. Here‚Äôs how it‚Äôs generally done:

### **Direct Measurement:**
1. **Time-and-Distance Observation**
   - **Researchers** observe swallows flying between two points at a known distance.
   - **Stopwatches or high-speed cameras** are used to time how long it takes for the swallow to traverse the distance.
   - **Speed** = Distance / Time.
   - Example: A 50-meter stretch, crossed in 4.5 seconds = 11.1 m/s.

2. **Radar Gun or Doppler Radar**
   - Similar to equipment used by the police for measuring car speeds.
   - Radar is pointed at birds in flight, providing an instant velocity reading.
   - Used mainly with bigger birds, but has been applied to swallows in some studies.

3. **Photogrammetry**
   - Using **high-speed synchronized cameras** to record the bird‚Äôs position over time.
   - Plotting the bird‚Äôs position frame-by-frame to calculate speed.

---

### **Theoretical/Mathematical Computation:**
For the *truly pedantic* (and lovers of Monty Python references), we can nerd out even more:

Researchers like C.J.O. Harrison (e.g., in *‚ÄúThe Flight of Birds: The Significance of Cirulation and Angle of Attack‚Äù*, Nature, 1953) developed formulas using:
- **Wingbeat frequency:** Swallows flap about 15 times per second.
- **Wingspan and body mass:** Known averages for Hirundo rustica.
- **Aerodynamic equations:** Lift, drag, and thrust calculations using bird physiology data.

When Michael Rowan-Robinson, a physicist, took a swing at it for fun (inspired by Python), he considered:
- **Typical wingspan:** 0.25 m
- **Wingbeat frequency:** 15 Hz
- **Lift/drag coefficients** and air density.
- **Result:** Computed an average cruising speed in the ballpark of **11 m/s**.

---

### **What This All Means**
So, the oft-quoted ‚Äú11 meters per second‚Äù (24 mph) is based on both direct field measurement (time and distance, sometimes radar) **and** reinforced by aerodynamic theory‚Äîplus a dash of whimsy and public fascination.

**In short:** Swallow airspeed velocity is measured by clever ornithologists with a mix of stopwatches, high-speed cameras, and math‚Äîand once in a while, by a passing king on a quest for the Holy Grail.

# More advanced usage
- Prompt Management
- Structured JSON outputs, enables validation and safe passing downstream over long pipelines
- Map prompts to larger data sets asynchronously (e.g. send parallel batches of 50)


In [13]:
# code to get stuff from prompfoo directory

# get prompts from the prompt repository (the promptfoo yaml files)
# langfuse probably a better enterprise option
# prompt repository solution allows us to run evals, version prompts, improving performance over time

# logger.info("Show available prompts")
# my_prompt_loader = PromptLoader()
# my_prompt_loader.list_available_prompts()

# prompt_name = 'headline_classifier_v1'
# prompt_dict = my_prompt_loader.load_prompt_by_name(prompt_name)
# time.sleep(1)

# logger.info("Load a prompt")
# print(prompt_dict.get('system'), "")
# print(prompt_dict.get('user'), "")
# time.sleep(1)

# logger.info("Show prompt metadata")
# prompt_metadata = my_prompt_loader.get_prompt_metadata(prompt_name)
# print(prompt_metadata)
# time.sleep(1)

# logger.info("Format a prompt with input")
# print(my_prompt_loader.format_prompt(prompt_name, input_str="AI Is Replacing Online Moderators, But It's Bad at the Job"))


In [14]:
# output class for classifying headlines
class ClassificationResult(BaseModel):
    """A single headline classification result"""
    input_str: str = Field(description="The original headline text")
    output: bool = Field(description="Whether the headline is AI-related")

class ClassificationResultList(BaseModel):
    """List of ClassificationResult for batch processing"""
    results_list: list[ClassificationResult] = Field(description="List of classification results")


In [15]:

prompt_name = 'newsagent/headline_classifier'
lf_prompt = lf_client.get_prompt(prompt_name)
print(lf_prompt.prompt, end="\n")
print()

system_prompt = lf_prompt.prompt[0]['content']
print('system prompt\n', system_prompt, end="\n")

print()

user_prompt = lf_prompt.prompt[1]['content']
print('user prompt\n', user_prompt, end="\n")

config = lf_prompt.config if hasattr(lf_prompt, 'config') else {}
print ('config\n', config, end="\n")

model = config.get("model", 'gpt-5')
print('model\n', model, end="\n")


[{'type': 'message', 'role': 'system', 'content': 'You are a content-classification assistant that labels news headlines as AI-related or not.\nReturn JSON that matches the provided schema\n\nA headline is AI-related if it mentions (explicitly or implicitly):\n- Core AI models: machine learning, neural / deep / transformer networks\n- AI Applications: computer vision, NLP, robotics, autonomous driving, generative media\n- AI hardware, GPU chip supply, AI data centers and infrastructure\n- Companies or labs known for AI: OpenAI, DeepMind, Anthropic, xAI, NVIDIA, etc.\n- AI models & products: GPT-5, Gemini, Claude, Midjourney, DeepSeek, etc.\n- New AI products and AI integration into existing products/services\n- AI policy / ethics / safety / regulation / analysis\n- Research results related to AI\n- AI industry figures (Sam Altman, Demis Hassabis, Dario Amodei, etc.)\n- AI market and business developments, funding rounds, partnerships centered on AI\n- Any other news with a significant 

In [16]:
# send single prompts via LLMAgent asking for ClassificationResult structured output
classifier = LLMagent(
    system_prompt=system_prompt,
    user_prompt=user_prompt,
    output_type=ClassificationResult,
    model=model,
    verbose=True,
    logger=logger
)

test_headlines = [
    "AI Is Replacing Online Moderators, But It's Bad at the Job",
    "Baby Trapped in Refrigerator Eats Own Foot",
    "Machine Learning Breakthrough in Medical Diagnosis",
    "Local Restaurant Opens New Location",
    "ChatGPT Usage Soars in Educational Settings"
]

prompt_name = 'headline_classifier_v1'
prompt_dict = PromptLoader().load_prompt_by_name(prompt_name)

result = await classifier.prompt_dict({'input_str': test_headlines[0]})
print(result)
result = await classifier.prompt_dict({'input_str': test_headlines[1]})
print(result)


13:01:16 | NewsletterAgent.newsletter_agent | INFO | Initialized LLMagent:
system_prompt: You are a content-classification assistant that labels news headlines as AI-related or not.
Return JSON that matches the provided schema

A headline is AI-related if it mentions (explicitly or implicitly):
- Core AI models: machine learning, neural / deep / transformer networks
- AI Applications: computer vision, NLP, robotics, autonomous driving, generative media
- AI hardware, GPU chip supply, AI data centers and infrastructure
- Companies or labs known for AI: OpenAI, DeepMind, Anthropic, xAI, NVIDIA, etc.
- AI models & products: GPT-5, Gemini, Claude, Midjourney, DeepSeek, etc.
- New AI products and AI integration into existing products/services
- AI policy / ethics / safety / regulation / analysis
- Research results related to AI
- AI industry figures (Sam Altman, Demis Hassabis, Dario Amodei, etc.)
- AI market and business developments, funding rounds, partnerships centered on AI
- Any other

13:01:16.336 OpenAI Agents trace: Agent workflow
13:01:16.336   Agent run: 'LLMagent'
13:01:16.353     Responses API with 'gpt-5-mini'
13:01:16.354       OpenAI-generation


13:01:18 | NewsletterAgent.newsletter_agent | INFO | Result: RunResult:
- Last agent: Agent(name="LLMagent", ...)
- Final output (ClassificationResult):
    {
      "input_str": "AI Is Replacing Online Moderators, But It's Bad at the Job",
      "output": true
    }
- 2 new item(s)
- 1 raw response(s)
- 0 input guardrail result(s)
- 0 output guardrail result(s)
(See `RunResult` for more details)
13:01:18 | NewsletterAgent.newsletter_agent | INFO | User message: Classify the following headline(s): 
Baby Trapped in Refrigerator Eats Own Foot


input_str="AI Is Replacing Online Moderators, But It's Bad at the Job" output=True
13:01:18.779 OpenAI Agents trace: Agent workflow
13:01:18.780   Agent run: 'LLMagent'
13:01:18.780     Responses API with 'gpt-5-mini'
13:01:18.781       OpenAI-generation


13:01:21 | NewsletterAgent.newsletter_agent | INFO | Result: RunResult:
- Last agent: Agent(name="LLMagent", ...)
- Final output (ClassificationResult):
    {
      "input_str": "Baby Trapped in Refrigerator Eats Own Foot",
      "output": false
    }
- 2 new item(s)
- 1 raw response(s)
- 0 input guardrail result(s)
- 0 output guardrail result(s)
(See `RunResult` for more details)


input_str='Baby Trapped in Refrigerator Eats Own Foot' output=False


In [17]:
# send a single batch, asking for ClassificationResultList
# note different output type
classifier = LLMagent(
    system_prompt=system_prompt,
    user_prompt=user_prompt,
    output_type=ClassificationResultList,
    model=model,
    verbose=True,
    logger=logger
)

# Format headlines as a single string for batch processing
headlines_str = str(test_headlines)
result = await classifier.prompt_dict({'input_str': headlines_str})
print(f"Batch result: {result}")



13:01:21 | NewsletterAgent.newsletter_agent | INFO | Initialized LLMagent:
system_prompt: You are a content-classification assistant that labels news headlines as AI-related or not.
Return JSON that matches the provided schema

A headline is AI-related if it mentions (explicitly or implicitly):
- Core AI models: machine learning, neural / deep / transformer networks
- AI Applications: computer vision, NLP, robotics, autonomous driving, generative media
- AI hardware, GPU chip supply, AI data centers and infrastructure
- Companies or labs known for AI: OpenAI, DeepMind, Anthropic, xAI, NVIDIA, etc.
- AI models & products: GPT-5, Gemini, Claude, Midjourney, DeepSeek, etc.
- New AI products and AI integration into existing products/services
- AI policy / ethics / safety / regulation / analysis
- Research results related to AI
- AI industry figures (Sam Altman, Demis Hassabis, Dario Amodei, etc.)
- AI market and business developments, funding rounds, partnerships centered on AI
- Any other

13:01:21.966 OpenAI Agents trace: Agent workflow
13:01:21.968   Agent run: 'LLMagent'
13:01:21.969     Responses API with 'gpt-5-mini'
13:01:21.969       OpenAI-generation


13:01:28 | NewsletterAgent.newsletter_agent | INFO | Result: RunResult:
- Last agent: Agent(name="LLMagent", ...)
- Final output (ClassificationResultList):
    {
      "results_list": [
        {
          "input_str": "AI Is Replacing Online Moderators, But It's Bad at the Job",
          "output": true
        },
        {
          "input_str": "Baby Trapped in Refrigerator Eats Own Foot",
          "output": false
        },
        {
          "input_str": "Machine Learning Breakthrough in Medical Diagnosis",
          "output": true
        },
        {
          "input_str": "Local Restaurant Opens New Location",
          "output": false
        },
        {
          "input_str": "ChatGPT Usage Soars in Educational Settings",
          "output": true
        }
      ]
    }
- 2 new item(s)
- 1 raw response(s)
- 0 input guardrail result(s)
- 0 output guardrail result(s)
(See `RunResult` for more details)


Batch result: results_list=[ClassificationResult(input_str="AI Is Replacing Online Moderators, But It's Bad at the Job", output=True), ClassificationResult(input_str='Baby Trapped in Refrigerator Eats Own Foot', output=False), ClassificationResult(input_str='Machine Learning Breakthrough in Medical Diagnosis', output=True), ClassificationResult(input_str='Local Restaurant Opens New Location', output=False), ClassificationResult(input_str='ChatGPT Usage Soars in Educational Settings', output=True)]


In [18]:
# make batches and send multiple in parallel
headlines_df = pd.read_csv("test_headlines.csv")
headlines_df


Unnamed: 0.1,Unnamed: 0,id,src,title,url
0,71,0,Ars Technica,GitHub will be folded into Microsoft proper as...,https://arstechnica.com/gadgets/2025/08/github...
1,137,10,Ars Technica,"With new in-house models, Microsoft lays the g...",https://arstechnica.com/ai/2025/08/with-new-in...
2,46,16,Ars Technica,Google improves Gemini AI image editing with ‚Äú...,https://arstechnica.com/ai/2025/08/google-impr...
3,228,20,Ars Technica,Google warns that mass data theft hitting Sale...,https://arstechnica.com/security/2025/08/googl...
4,181,23,Bloomberg,AI Wants More Data. More Chips. More Real Esta...,https://www.bloomberg.com/news/features/2024-1...
...,...,...,...,...,...
245,73,758,NewsAPI,"Reframing Jensen‚Äôs Law: ‚ÄòBuy more, make more‚Äô ...",https://siliconangle.com/2025/08/30/reframing-...
246,176,759,NewsAPI,Zeta Global (ZETA) Target Raised by Goldman as...,https://finance.yahoo.com/news/zeta-global-zet...
247,8,763,NewsAPI,Luis Enrique names his squad to face Toulouse,https://onefootball.com/en/news/luis-enrique-n...
248,24,773,NewsAPI,CorelDRAW Graphics Suite 2025 v26.2.0.170,https://post.rlsbb.cc/coreldraw-graphics-suite...


In [39]:
headlines_df

Unnamed: 0.1,Unnamed: 0,id,src,title,url
0,71,0,Ars Technica,GitHub will be folded into Microsoft proper as...,https://arstechnica.com/gadgets/2025/08/github...
1,137,10,Ars Technica,"With new in-house models, Microsoft lays the g...",https://arstechnica.com/ai/2025/08/with-new-in...
2,46,16,Ars Technica,Google improves Gemini AI image editing with ‚Äú...,https://arstechnica.com/ai/2025/08/google-impr...
3,228,20,Ars Technica,Google warns that mass data theft hitting Sale...,https://arstechnica.com/security/2025/08/googl...
4,181,23,Bloomberg,AI Wants More Data. More Chips. More Real Esta...,https://www.bloomberg.com/news/features/2024-1...
...,...,...,...,...,...
245,73,758,NewsAPI,"Reframing Jensen‚Äôs Law: ‚ÄòBuy more, make more‚Äô ...",https://siliconangle.com/2025/08/30/reframing-...
246,176,759,NewsAPI,Zeta Global (ZETA) Target Raised by Goldman as...,https://finance.yahoo.com/news/zeta-global-zet...
247,8,763,NewsAPI,Luis Enrique names his squad to face Toulouse,https://onefootball.com/en/news/luis-enrique-n...
248,24,773,NewsAPI,CorelDRAW Graphics Suite 2025 v26.2.0.170,https://post.rlsbb.cc/coreldraw-graphics-suite...


In [55]:
#!/usr/bin/env python3
"""
General-purpose LLM calling module with flexible prompt templating and batch processing.

This module provides the LLMagent class for making structured LLM calls with:
- Flexible prompt templates with variable substitution
- Single and batch processing modes
- Retry logic with exponential backoff
- Pydantic output validation
- Async batch processing with concurrency control
"""

import asyncio
import json
import logging
from typing import Any, Dict, List, Type, Union, Optional
from pydantic import BaseModel
import pandas as pd

import openai
from tenacity import (
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    wait_exponential
)

from agents import Agent, Runner
from log_handler import sanitize_error_for_logging

_logger = logging.getLogger(__name__)

async def paginate_df_async(df: pd.DataFrame, chunk_size: int = 25):
    """Async generator for DataFrame pagination."""
    for i in range(0, len(df), chunk_size):
        yield df.iloc[i:i + chunk_size]
        await asyncio.sleep(0)  # Allow other tasks to run


class LLMagent(Agent):
    """
    General-purpose LLM agent for making structured calls with flexible prompt templating.

    Supports:
    - Multiple variable substitution in prompt templates
    - Single prompt calls with keyword arguments or dictionaries
    - Batch processing with async concurrency control
    - Retry logic with exponential backoff
    - Pydantic output validation
    """

    def __init__(self,
                 system_prompt: str,
                 user_prompt: str,
                 output_type: Type[BaseModel],
                 model: str,
                 verbose: bool = False,
                 logger: Optional[logging.Logger] = None):
        """
        Initialize the LLMagent

        Args:
            system_prompt: The system prompt template with variable placeholders (e.g., "You are a {role} assistant")
            user_prompt: The user prompt template with variable placeholders (e.g., "Analyze this {content_type}: {input}")
            output_type: Pydantic model class for structured output
            model: Model string (e.g., "gpt-4o")
            verbose: Enable verbose logging
            logger: Optional logger instance to use instead of module logger
        """
        super().__init__(
            name="LLMagent",
            model=model,
            instructions=system_prompt,
            output_type=output_type
        )
        self.system_prompt = system_prompt
        self.user_prompt = user_prompt
        self.output_type = output_type
        self.verbose = verbose
        self.logger = logger or _logger

        if self.verbose:
            self.logger.info(f"""Initialized LLMagent:
system_prompt: {self.system_prompt}
user_prompt: {self.user_prompt}
output_type: {output_type.__name__}
model: {self.model}
schema: {json.dumps(output_type.model_json_schema(), indent=2)}
""")

    def _format_prompts(self, variables: Dict[str, Any]) -> str:
        """
        Format user prompt with variable substitution

        Args:
            variables: Dictionary of variables to substitute in user prompt template

        Returns:
            Formatted user prompt
        """
        try:
            formatted_user = self.user_prompt.format(**variables)
            return formatted_user
        except KeyError as e:
            raise ValueError(f"Missing required variable in prompt template: {e}")
        except Exception as e:
            raise ValueError(f"Error formatting prompts: {e}")

    @retry(
        retry=retry_if_exception_type((
            openai.APIConnectionError,
            openai.APITimeoutError,
            openai.InternalServerError
        )),
        stop=stop_after_attempt(5),
        wait=wait_exponential(multiplier=1, min=1, max=30),
    )
    async def prompt_dict(self, variables: Dict[str, Any]) -> Any:
        """
        Make a single LLM call with dictionary-based variable substitution

        Args:
            variables: Dictionary of variables to substitute in prompt templates

        Returns:
            Single result of the specified output type
        """
        user_message = self._format_prompts(variables)

        if self.verbose:
            self.logger.info(f"User message: {user_message}")

        results = await Runner.run(self, user_message)

        if self.verbose:
            self.logger.info(f"Result: {results}")

        return results.final_output if hasattr(results, 'final_output') else results

    async def prompt_batch(self,
                          variables_list: List[Dict[str, Any]],
                          batch_size: int = 25,
                          max_concurrency: int = 16,
                          retries: int = 3,
                          item_list_field: str = 'results_list',
                          item_id_field: str = '') -> List[Any]:
        """
        Process a list of variable dictionaries using true batch calls.

        Note: This method assumes the prompt template expects a single 'input_str' parameter.
        All items from each batch will be converted to string and processed in a single API call
        per batch, dramatically reducing cost and improving performance.

        Args:
            variables_list: List of variable dictionaries for prompt substitution
            batch_size: Number of items to process in each batch
            max_concurrency: Maximum number of concurrent requests
            retries: Number of retry attempts for failed requests
            item_id_field: Optional ID field name for validation. If provided, validates that each sent ID matches a received ID

        Returns:
            List of results maintaining original input order
        """
        if not variables_list:
            return []

        # Split into batches
        batches = [variables_list[i:i+batch_size]
                  for i in range(0, len(variables_list), batch_size)]

        sem = asyncio.Semaphore(max_concurrency)
        self.logger.info(f"Processing {len(batches)} batches with concurrency {max_concurrency}")

        async def _process_batch(batch_idx: int, batch_variables: List[Dict[str, Any]]) -> tuple[int, List[Any]]:
            """Process a single batch with retry logic"""
            last_exc = None

            for attempt in range(retries):
                try:
                    async with sem:
                        # Process the entire batch in a single API call
                        result = await self.prompt_dict({'input_str': str(batch_variables)})
                        batch_results = result

                        # Validate IDs if item_id_field is specified
                        if item_id_field:
                            sent_ids = [var.get(item_id_field) for var in batch_variables]
                            received_ids = []

                            for result in batch_results:
                                if hasattr(result, item_id_field):
                                    received_ids.append(getattr(result, item_id_field))
                                elif isinstance(result, dict) and item_id_field in result:
                                    received_ids.append(result[item_id_field])
                                else:
                                    raise ValueError(f"Result missing required ID field '{item_id_field}': {result}")

                            # Check if all sent IDs have corresponding received IDs
                            sent_set = set(sent_ids)
                            received_set = set(received_ids)

                            if sent_set != received_set:
                                missing_ids = sent_set - received_set
                                extra_ids = received_set - sent_set
                                error_msg = f"ID mismatch in batch {batch_idx}:"
                                if missing_ids:
                                    error_msg += f" Missing IDs: {missing_ids}"
                                if extra_ids:
                                    error_msg += f" Extra IDs: {extra_ids}"
                                raise ValueError(error_msg)

                        return batch_idx, batch_results

                except Exception as e:
                    last_exc = e
                    self.logger.warning(f"Batch {batch_idx} attempt {attempt + 1}/{retries} failed: {e}")
                    if attempt < retries - 1:
                        await asyncio.sleep(2 ** attempt)  # Exponential backoff

            # If all retries failed, raise the last exception
            raise last_exc or RuntimeError(f"Unknown error processing batch {batch_idx}")

        # Create tasks for all batches
        tasks = [
            asyncio.create_task(_process_batch(i, batch))
            for i, batch in enumerate(batches)
        ]

        # Wait for all batches to complete
        batch_results = await asyncio.gather(*tasks)

        if item_list_field:
            # Reassemble results in original order
            flattened_results = []
            flattened_success = False
            for batch_idx, results in sorted(batch_results, key=lambda x: x[0]):
                if hasattr(results, item_list_field):
                    flattened_results.extend(getattr(results, item_list_field))
                else:
                    break
                flattened_success = True

            if flattened_success:
                # Validate final result count
                if len(flattened_results) != len(variables_list):
                    raise ValueError(
                        f"Result count mismatch: expected {len(variables_list)}, got {len(flattened_results)}"
                    )
                else:
                    return flattened_results
            else: # return unflattened results
                return batch_results
        else: # return unflattened results
            return batch_results


    async def filter_dataframe(self,
                               input_df: pd.DataFrame,
                               input_vars: Optional[Dict[str, Any]] = None,
                               item_list_field: str = 'results_list',
                               item_id_field: str = 'id',
                               retries: int = 3
                              ) -> Any:
        """
        Process a single DataFrame asynchronously using Agent SDK.

        Applies the configured system and user prompts to a DataFrame converted to JSON,
        with configurable delimiters and additional input variables.

        Note: This method expects the user_prompt template to contain an {input_text} placeholder
        where the DataFrame JSON will be substituted.

        Args:
            input_df: The DataFrame to process
            input_vars: Optional additional variables for prompt substitution
            item_list_field: Name of the field in the response that contains the list of results
            item_id_field: Name of the ID field to validate matches between sent and received data
            retries: Number of retry attempts for validation failures

        Returns:
            Single result of the configured output_type (structured Pydantic object)
        """
        expected_count = len(input_df)
        last_exc = None

        for attempt in range(retries):
            try:
                # Convert DataFrame to JSON
                input_text = input_df.to_json(orient='records', indent=2)

                # Prepare the input dictionary
                input_dict = {"input_text": input_text}
                # add input_vars if provided
                if input_vars is not None:
                    input_dict.update(input_vars)
                # Use existing prompt_dict method with retry logic
                result = await self.prompt_dict(input_dict)
                # Validate item count and IDs if item_list_field is specified
                if item_list_field:
                    if hasattr(result, item_list_field):
                        result_list = getattr(result, item_list_field)
                        if isinstance(result_list, list):
                            received_count = len(result_list)
                            if received_count != expected_count:
                                error_msg = f"Item count mismatch: expected {expected_count}, got {received_count}"
                                self.logger.warning(f"Attempt {attempt + 1}/{retries}: {error_msg}")
                                if attempt < retries - 1:
                                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
                                    continue
                                else:
                                    raise ValueError(error_msg)

                            # Validate IDs if item_id_field is specified and exists in DataFrame
                            if item_id_field and item_id_field in input_df.columns:
                                sent_ids = input_df[item_id_field].tolist()
                                received_ids = []

                                for item in result_list:
                                    if hasattr(item, item_id_field):
                                        received_ids.append(getattr(item, item_id_field))
                                    elif isinstance(item, dict) and item_id_field in item:
                                        received_ids.append(item[item_id_field])
                                    else:
                                        error_msg = f"Result item missing required ID field '{item_id_field}': {item}"
                                        self.logger.warning(f"Attempt {attempt + 1}/{retries}: {error_msg}")
                                        if attempt < retries - 1:
                                            await asyncio.sleep(2 ** attempt)
                                            continue
                                        else:
                                            raise ValueError(error_msg)

                                # Check if all sent IDs have corresponding received IDs
                                sent_set = set(sent_ids)
                                received_set = set(received_ids)

                                if sent_set != received_set:
                                    missing_ids = sent_set - received_set
                                    extra_ids = received_set - sent_set
                                    error_msg = f"ID mismatch:"
                                    if missing_ids:
                                        error_msg += f" Missing IDs: {missing_ids}"
                                    if extra_ids:
                                        error_msg += f" Extra IDs: {extra_ids}"

                                    self.logger.warning(f"Attempt {attempt + 1}/{retries}: {error_msg}")
                                    if attempt < retries - 1:
                                        await asyncio.sleep(2 ** attempt)  # Exponential backoff
                                        continue
                                    else:
                                        raise ValueError(error_msg)

                        else:
                            raise ValueError(f"Field '{item_list_field}' is not a list: {type(result_list)}")
                    else:
                        raise ValueError(f"Result missing required field '{item_list_field}': {result}")

                return result

            except asyncio.TimeoutError as e:
                last_exc = e
                self.logger.error(f"Timeout error in filter_dataframe_async: {str(e)}")
                if attempt < retries - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                raise
            except (ConnectionError, TimeoutError) as e:
                last_exc = e
                self.logger.error(f"Network/timeout error in filter_dataframe_async: {str(e)}")
                if attempt < retries - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                raise
            except ValueError as e:
                last_exc = e
                self.logger.error(f"Invalid data in filter_dataframe_async: {str(e)}")
                if attempt < retries - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                raise
            except Exception as e:
                last_exc = e
                self.logger.error(f"Unexpected error in filter_dataframe_async: {str(e)}")
                if attempt < retries - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                raise

        # If we get here, all retries failed
        raise last_exc or RuntimeError(f"Unknown error after {retries} attempts")

    async def filter_dataframe_batch(self,
                                   input_df: pd.DataFrame,
                                   input_vars: Optional[Dict[str, Any]] = None,
                                   item_list_field: str = 'results_list',
                                   item_id_field: str = 'id',
                                   retries: int = 3,
                                   chunk_size: int = 25
                                   ) -> Any:
        """
        Process a DataFrame in chunks asynchronously using concurrent calls to filter_dataframe.

        Chunks the input DataFrame using paginate_df_async and processes each chunk
        simultaneously with filter_dataframe. If item_list_field is specified and valid,
        concatenates all result lists into a single object. Otherwise returns a list of results.

        Args:
            input_df: The DataFrame to process
            input_vars: Optional additional variables for prompt substitution
            item_list_field: Name of the field in the response that contains the list of results
            item_id_field: Name of the ID field to validate matches between sent and received data
            retries: Number of retry attempts for validation failures per chunk
            chunk_size: Number of rows per chunk (default: 25)

        Returns:
            Single concatenated result object (if item_list_field specified) or list of results
        """
        if input_df.empty:
            return []

        # Create chunks using the async generator
        chunks = []
        async for chunk in paginate_df_async(input_df, chunk_size):
            chunks.append(chunk)

        if not chunks:
            return []

        # Process all chunks concurrently
        tasks = [
            self.filter_dataframe(
                chunk,
                input_vars=input_vars,
                item_list_field=item_list_field,
                item_id_field=item_id_field,
                retries=retries
            )
            for chunk in chunks
        ]

        try:
            results = await asyncio.gather(*tasks)
        except Exception as e:
            self.logger.error(f"Error in filter_dataframe_batch: {e}")
            raise

        # If item_list_field is specified, concatenate all result lists
        if item_list_field:
            try:
                # Validate that all results have the expected field
                all_items = []
                for result in results:
                    if hasattr(result, item_list_field):
                        result_list = getattr(result, item_list_field)
                        if isinstance(result_list, list):
                            all_items.extend(result_list)
                        else:
                            self.logger.error(f"Field '{item_list_field}' is not a list: {type(result_list)}")
                            return results  # Fall back to returning raw results
                    else:
                        self.logger.error(f"Result missing field '{item_list_field}': {result}")
                        return results  # Fall back to returning raw results

                # Create a new result object with concatenated items
                # Use the structure of the first result as template
                if results and hasattr(results[0], item_list_field):
                    # Create a copy of the first result and replace the list field
                    concatenated_result = results[0].__class__(**{
                        **{k: v for k, v in results[0].__dict__.items() if k != item_list_field},
                        item_list_field: all_items
                    })
                    return concatenated_result
                else:
                    # If we can't create proper structure, return the items directly
                    return all_items

            except Exception as e:
                self.logger.error(f"Error concatenating results: {e}")
                return results  # Fall back to returning raw results
        else:
            # No item_list_field specified, return list of results
            return results


In [35]:
logger.info("classify headlines as AI-related or not")

classifier = LLMagent(
    system_prompt=system_prompt,
    user_prompt=user_prompt,
    output_type=ClassificationResultList,  
    model='gpt-4o-mini',  # Use a valid model
    verbose=False,
    logger=logger
)

# Prepare variables for batch processing
headlines_list = headlines_df['title'].to_list()
variables_list = [{'input_str': headline} for headline in headlines_list]

# Use prompt_batch for efficient processing
classification_result = await classifier.prompt_batch(
    variables_list,
    batch_size=25,
    max_concurrency=16,
    retries=3
)

logger.info(f"Classified {len(classification_result)} headlines")
classification_result

13:15:43 | NewsletterAgent.newsletter_agent | INFO | classify headlines as AI-related or not
13:15:43 | NewsletterAgent.newsletter_agent | INFO | Processing 10 batches with concurrency 16


13:15:43.090 OpenAI Agents trace: Agent workflow
13:15:43.093   Agent run: 'LLMagent'
13:15:43.094 OpenAI Agents trace: Agent workflow
13:15:43.094   Agent run: 'LLMagent'
13:15:43.095 OpenAI Agents trace: Agent workflow
13:15:43.095   Agent run: 'LLMagent'
13:15:43.096 OpenAI Agents trace: Agent workflow
13:15:43.096   Agent run: 'LLMagent'
13:15:43.096 OpenAI Agents trace: Agent workflow
13:15:43.097   Agent run: 'LLMagent'
13:15:43.097 OpenAI Agents trace: Agent workflow
13:15:43.098   Agent run: 'LLMagent'
13:15:43.098 OpenAI Agents trace: Agent workflow
13:15:43.099   Agent run: 'LLMagent'
13:15:43.099 OpenAI Agents trace: Agent workflow
13:15:43.099   Agent run: 'LLMagent'
13:15:43.100 OpenAI Agents trace: Agent workflow
13:15:43.100   Agent run: 'LLMagent'
13:15:43.101 OpenAI Agents trace: Agent workflow
13:15:43.101   Agent run: 'LLMagent'
             OpenAI Agents trace: Agent workflow
               Agent run: 'LLMagent'
13:15:43.108     Responses API with 'gpt-4o-mini'
13:1

13:16:07 | NewsletterAgent.newsletter_agent | INFO | Classified 250 headlines


[ClassificationResult(input_str='GitHub will be folded into Microsoft proper as CEO steps down', output=False),
 ClassificationResult(input_str='With new in-house models, Microsoft lays the groundwork for independence from OpenAI', output=True),
 ClassificationResult(input_str='Google improves Gemini AI image editing with ‚Äúnano banana‚Äù model', output=True),
 ClassificationResult(input_str='Google warns that mass data theft hitting Salesloft AI agent has grown bigger', output=True),
 ClassificationResult(input_str='AI Wants More Data. More Chips. More Real Estate. More Power. More Water. More Everything', output=True),
 ClassificationResult(input_str='Opinion: China Just Got a Big Leg Up in the AI Race', output=True),
 ClassificationResult(input_str='Jack Ma-Backed Ant‚Äôs Profit Dives 60% After AI, Global Expansion', output=True),
 ClassificationResult(input_str='Dell Falls After Reporting Tighter Profit Margins on Servers', output=False),
 ClassificationResult(input_str='AI Billio

In [38]:
# see results, true and false
zdf = pd.DataFrame([(z.input_str, z.output) for z in classification_result], columns=["input", "output"])
display(zdf.loc[zdf["output"]])
zdf.loc[~zdf["output"]]


Unnamed: 0,input,output
1,"With new in-house models, Microsoft lays the g...",True
2,Google improves Gemini AI image editing with ‚Äú...,True
3,Google warns that mass data theft hitting Sale...,True
4,AI Wants More Data. More Chips. More Real Esta...,True
5,Opinion: China Just Got a Big Leg Up in the AI...,True
...,...,...
241,How we tested AI search toolsDetails on the me...,True
242,AI stethoscope could detect major heart condit...,True
243,Robinhood CEO Says AI Will Make Investing 'Muc...,True
244,Trade Vector AI: How Trade Vector Artificial I...,True


Unnamed: 0,input,output
0,GitHub will be folded into Microsoft proper as...,False
7,Dell Falls After Reporting Tighter Profit Marg...,False
10,Vercel Triples Valuation to $9 Billion With Ac...,False
11,Bain Is Said to Draw Chinese Bidders for $4 Bi...,False
14,The best college laptops of 2025: Top models f...,False
...,...,...
239,Why so few Americans read for pleasureThe decl...,False
246,Zeta Global (ZETA) Target Raised by Goldman as...,False
247,Luis Enrique names his squad to face Toulouse,False
248,CorelDRAW Graphics Suite 2025 v26.2.0.170,False


In [None]:
# zdf.to_csv('headline_classifier_ground_truth.csv', index=False)

In [43]:
test_df = headlines_df[:5][["id", "title"]].copy()
test_df.to_json(orient='records')



'[{"id":0,"title":"GitHub will be folded into Microsoft proper as CEO steps down"},{"id":10,"title":"With new in-house models, Microsoft lays the groundwork for independence from OpenAI"},{"id":16,"title":"Google improves Gemini AI image editing with \\u201cnano banana\\u201d model"},{"id":20,"title":"Google warns that mass data theft hitting Salesloft AI agent has grown bigger"},{"id":23,"title":"AI Wants More Data. More Chips. More Real Estate. More Power. More Water. More Everything"}]'

In [48]:
test_df 


Unnamed: 0,id,title
0,0,GitHub will be folded into Microsoft proper as...
1,10,"With new in-house models, Microsoft lays the g..."
2,16,Google improves Gemini AI image editing with ‚Äú...
3,20,Google warns that mass data theft hitting Sale...
4,23,AI Wants More Data. More Chips. More Real Esta...


In [None]:
[ClassificationResult(input_str='OpenAI launches new API for developers', output=True), 
 ClassificationResult(input_str='Stock market hits all-time high', output=False), 
 ClassificationResult(input_str='NVIDIA announces new GPU for AI applications', output=True), 
 ClassificationResult(input_str='Health benefits of walking daily', output=False), 
 ClassificationResult(input_str='Google patents a new machine learning model', output=True), 
 ClassificationResult(input_str='Apple releases the latest iPhone model', output=False), 
 ClassificationResult(input_str='AI ethics in research: A growing concern', output=True), 
 ClassificationResult(input_str='Local sports team wins championship', output=False), 
 ClassificationResult(input_str='DeepMind achieves breakthrough in protein folding', output=True), 
 ClassificationResult(input_str='New smartphone features include high-definition camera', output=False), 
 ClassificationResult(input_str='Claude AI introduces advanced text generation capabilities', output=True), 
 ClassificationResult(input_str='The rise of electric vehicles in the automotive market', output=False), 
 ClassificationResult(input_str='Generative art project utilizes AI tools', output=True), 
 ClassificationResult(input_str='Latest trends in cloud computing', output=False), 
 ClassificationResult(input_str='AI-driven analysis shows climate impact predictions', output=True), 
 ClassificationResult(input_str='Annual tech conference highlights latest innovations', output=False)]


In [56]:
FILTER_SYSTEM_PROMPT = """
You are a content-classification assistant that labels news headlines as AI-related or not.
You will receive a list of JSON object with fields "id" and "title"
Return **only** a JSON object that satisfies the provided schema.
For each headline provided, you MUST return one element with the same id, and a boolean value; do not skip any items.
No markdown, no markdown fences, no extra keys, no comments.
"""

FILTER_USER_PROMPT = """
Classify every headline below.

AI-related if the title mentions (explicitly or implicitly):
- Core AI technologies: machine learning, neural / deep / transformer networks
- AI Applications: computer vision, NLP, robotics, autonomous driving, generative media
- AI hardware, GPU chip supply, AI data centers and infrastructure
- Companies or labs known for AI: OpenAI, DeepMind, Anthropic, xAI, NVIDIA, etc.
- AI models & products: ChatGPT, Gemini, Claude, Sora, Midjourney, DeepSeek, etc.
- New AI products and AI integration into existing products/services
- AI policy / ethics / safety / regulation / analysis
- Research results related to AI
- AI industry figures (Sam Altman, Demis Hassabis, etc.)
- AI market and business developments, funding rounds, partnerships centered on AI
- Any other news with a significant AI component

Non-AI examples: crypto, ordinary software, non-AI gadgets and medical devices, and anything else.
Input:
{input_text}
"""

# output class for classifying headlines
class ClassificationResultId(BaseModel):
    """A single headline classification result"""
    id: int = Field("The news item id")
    input_str: str = Field(description="The original headline title")
    output: bool = Field(description="Whether the headline title is AI-related")

class ClassificationResultIdList(BaseModel):
    """List of ClassificationResult for batch processing"""
    results_list: list[ClassificationResultId] = Field(description="List of classification results")


classifier = LLMagent(
    system_prompt=FILTER_SYSTEM_PROMPT,
    user_prompt=FILTER_USER_PROMPT,
    output_type=ClassificationResultIdList,  
    model='gpt-4o-mini',  # Use a valid model
    verbose=False,
    logger=logger
)

z = await classifier.filter_dataframe(test_df)
z 



13:42:19.552 OpenAI Agents trace: Agent workflow
13:42:19.557   Agent run: 'LLMagent'
13:42:19.560     Responses API with 'gpt-4o-mini'
13:42:19.560       OpenAI-generation


ClassificationResultIdList(results_list=[ClassificationResultId(id=0, input_str='GitHub will be folded into Microsoft proper as CEO steps down', output=False), ClassificationResultId(id=10, input_str='With new in-house models, Microsoft lays the groundwork for independence from OpenAI', output=True), ClassificationResultId(id=16, input_str='Google improves Gemini AI image editing with ‚Äúnano banana‚Äù model', output=True), ClassificationResultId(id=20, input_str='Google warns that mass data theft hitting Salesloft AI agent has grown bigger', output=True), ClassificationResultId(id=23, input_str='AI Wants More Data. More Chips. More Real Estate. More Power. More Water. More Everything', output=True)])

In [57]:
z = await classifier.filter_dataframe_batch(headlines_df[["id", "title"]])
z


13:42:34.570 OpenAI Agents trace: Agent workflow
13:42:34.572   Agent run: 'LLMagent'
13:42:34.573 OpenAI Agents trace: Agent workflow
13:42:34.574   Agent run: 'LLMagent'
13:42:34.574 OpenAI Agents trace: Agent workflow
13:42:34.575   Agent run: 'LLMagent'
13:42:34.575 OpenAI Agents trace: Agent workflow
13:42:34.575   Agent run: 'LLMagent'
13:42:34.576 OpenAI Agents trace: Agent workflow
13:42:34.580   Agent run: 'LLMagent'
13:42:34.580 OpenAI Agents trace: Agent workflow
13:42:34.581   Agent run: 'LLMagent'
13:42:34.581 OpenAI Agents trace: Agent workflow
13:42:34.582   Agent run: 'LLMagent'
13:42:34.583 OpenAI Agents trace: Agent workflow
13:42:34.583   Agent run: 'LLMagent'
13:42:34.583 OpenAI Agents trace: Agent workflow
13:42:34.584   Agent run: 'LLMagent'
13:42:34.584 OpenAI Agents trace: Agent workflow
13:42:34.585   Agent run: 'LLMagent'
             OpenAI Agents trace: Agent workflow
               Agent run: 'LLMagent'
13:42:34.628     Responses API with 'gpt-4o-mini'
13:4



In [None]:
correct_df = pd.read_csv("headline_classifier_ground_truth.csv")
correct_df


In [None]:
# test various models for speed, accuracy, cost
# see costs under tracing
# http://localhost:3000/
# 'gpt-5-mini' 9.5cents, 'gpt-4.1' 16.8cents, 'gpt-4.1-mini'  3.2cents
models = ['gpt-5-mini', 'gpt-4.1', 'gpt-4.1-mini',]
result_tuples = []

for m in models:
    print(f"Starting evaluation for {m}...")
    
    # Start timing
    start_time = time.time()
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
    
    with lf_client.start_as_current_span(name=f"batch_classification_{m}_{timestamp}") as span:
    
        classifier = ClassifierAgent(system_prompt,
                                     user_prompt,
                                     ClassificationResultList,
                                     m,
                                     verbose=False )
    
        # Run classification with tracing
        classification_result = await classifier.classify_batch(
            list(headlines_df['title'].to_list())
        )
        
        # Add span metadata
        span.update(
            input={"headlines_count": len(headlines_df)},
            metadata={"model": m}
        )
        
    # Calculate elapsed time
    end_time = time.time()
    elapsed_time = end_time - start_time
    
    # analyze results
    result_df = pd.DataFrame([(z.input_str, z.output) 
                              for z in classification_result.results_list], 
                             columns=["input", "output"])
    
    
    # Merge with ground truth to compare results
    comparison_df = result_df.merge(correct_df, on='input', suffixes=('_predicted', '_correct'))
    
    # Calculate accuracy
    comparison_df['is_correct'] = comparison_df['output_predicted'] == comparison_df['output_correct']
    accuracy = comparison_df['is_correct'].mean()
    correct_count = comparison_df['is_correct'].sum()
    total_count = len(comparison_df)
    
    # Find differences
    differences_df = comparison_df[~comparison_df['is_correct']].copy()
    
    print(f"Completed {m} in {elapsed_time:.2f} seconds")
    print(f"Accuracy: {correct_count}/{total_count} = {accuracy:.3f} ({accuracy*100:.1f}%)")

    if len(differences_df) > 0:
        print(f"Found {len(differences_df)} incorrect predictions:")
        print("-" * 80)
        for idx, row in differences_df.iterrows():
            print(f"Input: {row['input']}")
            print(f"Predicted: {row['output_predicted']}")
            print(f"Correct:   {row['output_correct']}")
            print("-" * 40)
    else:
        print("üéâ Perfect accuracy! No incorrect predictions.")
    
    print()  # Empty line for readability
    
    # Create tuple with (model_name, df, elapsed_time, accuracy, differences_df)
    result_tuples.append((m, result_df, elapsed_time, accuracy, differences_df))

# Summary comparison
print("=" * 60)
print("SUMMARY COMPARISON")
print("=" * 60)
summary_data = []
for model_name, df, elapsed_time, accuracy, differences_df in result_tuples:
    rate = len(df) / elapsed_time if elapsed_time > 0 else 0
    summary_data.append({
        'Model': model_name,
        'Accuracy': f"{accuracy:.3f}",
        'Accuracy %': f"{accuracy*100:.1f}%", 
        'Correct': f"{int(accuracy * len(df))}/{len(df)}",
        'Time (s)': f"{elapsed_time:.2f}",
        'Rate (pred/s)': f"{rate:.1f}",
        'Errors': len(differences_df)
    })

summary_df = pd.DataFrame(summary_data)
print(summary_df.to_string(index=False))

# Find most common errors across all models
print("\n" + "=" * 60)
print("MOST COMMON ERRORS ACROSS ALL MODELS")
print("=" * 60)
all_errors = []
for model_name, _, _, _, differences_df in result_tuples:
    for _, row in differences_df.iterrows():
        all_errors.append({
            'input': row['input'],
            'predicted': row['output_predicted'],
            'correct': row['output_correct'],
            'model': model_name
        })

if all_errors:
    error_df = pd.DataFrame(all_errors)
    error_counts = error_df.groupby('input').size().sort_values(ascending=False)
    
    print("Headlines that multiple models got wrong:")
    for headline, count in error_counts.head(10).items():
        if count > 1:  # Only show errors made by multiple models
            models_wrong = error_df[error_df['input'] == headline]['model'].tolist()
            predicted_values = error_df[error_df['input'] == headline]['predicted'].unique()
            correct_value = error_df[error_df['input'] == headline]['correct'].iloc[0]
            
            print(f"\n‚ùå Error in {count}/{len(models)} models: {', '.join(models_wrong)}")
            print(f"   Headline: {headline}")
            print(f"   Predicted: {predicted_values}")
            print(f"   Correct: {correct_value}")

lf_client.flush()



In [None]:
# Now you can access results like:
for model_name, df, elapsed_time in result_tuples:
    print(f"{model_name}: {len(df)} results in {elapsed_time:.2f}s")

# Run Agent Worfklow

In [None]:
# class to store agent state from step to step
from newsletter_state import NewsletterAgentState


In [None]:
import sys 

# if 'fetch' in sys.modules:
#     del sys.modules['fetch']
#     # Delete the reference
#     del Fetcher
from fetch import Fetcher

# should probably do this in the initialization based on parameters --nofetch
destination = "download/sources/"
for file in os.listdir(destination):
    file_path = os.path.join(destination, file)
    if os.path.isfile(file_path):
        os.remove(file_path)
        logger.info(f"Removed existing file: {file_path}")

In [None]:
async with Fetcher() as f:
     z = await f.fetch_all()
z 


In [None]:
len(z)

In [None]:
for src in z:
    print(src['source'])
    print(src['status'])
    print(src['metadata'])
    print(len(src['results']))
          

In [None]:
z[0]

In [None]:
sources_results = z
successful_sources = []
failed_sources = []
all_articles = []

for result in sources_results:
    if result['status'] == 'success' and result['results']:
        # Add source info to each article
        successful_sources.append(result['source'])
        all_articles.extend(result['results'])
    else:
        failed_sources.append(result['source'])
        
headline_data = all_articles

In [None]:
# TODO
len(all_articles)
headline_df = pd.DataFrame(all_articles)
display(headline_df[["source", "url"]].groupby("source") \
    .count() \
    .reset_index() \
    .rename({'url': 'count'}))


In [None]:
f.sources.get('Ars Technica')


In [None]:
from news_agent import WorkflowStatusTool


In [None]:
news_agent = NewsletterAgent(session_id=f"newsletter_{random.randint(10000000, 99999999)}", verbose=True)


In [None]:
user_prompt = "Run all the workflow steps in order and create the newsletter"

start_time = time.time()
result = await news_agent.run_step(user_prompt)
duration = time.time() - start_time

print("=" * 80)
print(f"‚è±Ô∏è  Total execution time: {duration:.2f}s")
print(f"üìä Final result:")
print(result)



In [None]:
# Create mock context
class MockContext:
    def __init__(self):
        self.context = news_agent.default_state

ctx = MockContext()
current_state = ctx.context  # From your previous run, or reload it
df = current_state.headline_df
df



In [None]:
try:
    current_state = news_agent.session.get_state()
except:
    current_state = news_agent.default_state

print(current_state)
print()

print(f"Current Step: {current_state.current_step}/9")
print(f"Workflow Complete: {current_state.workflow_complete}")
print(f"Progress: {(current_state.current_step/9)*100:.1f}%")
print(f"Total articles: {len(current_state.headline_data)}")

if current_state.headline_data:
    ai_related = sum(1 for a in current_state.headline_data if a.get('ai_related') is True)
    print(f"AI-related articles: {ai_related}")
    print(f"Summaries: {len(current_state.article_summaries)}")
    print(f"Clusters: {len(current_state.topic_clusters)}")
    print(f"Sections: {len(current_state.newsletter_sections)}")

In [None]:
# review slides

# review workflow status, move to a moadule
# all prints should be logs
# section writing and composition will have the critic /optimizer loop
# add batch with async


In [None]:
def create_news_dataframe():
    """
    Creates an empty DataFrame to support headline/article analysis
    - URLs, source tracking and metadata
    - Topic classification and clustering
    - Content quality ratings and rankings

    Returns:
        pd.DataFrame: Empty DataFrame with predefined column structure
    """

    # column structure
    column_dict = {
        # Core identifiers and source info
        'article_id': 'object',              # Unique identifier for each article
        'source':     'object',              # Source category
        'headline_title': 'object',          # Article headline/title
        'original_url': 'object',            # Initial URL before redirects
        'final_url': 'object',               # URL after following redirects
        'domain_name': 'category',           # Website domain
        'site_name': 'category',             # Human-readable site name
        'site_reputation_score': 'float32',  # Reputation/trustworthiness score for the site
        'keep_flag': 'boolean',

        # File paths and storage
        'html_file_path': 'object',          # Path to stored HTML content
        'text_file_path': 'object',          # Path to extracted text content

        # Time information
        'last_updated_timestamp': 'datetime64[ns]',  # When article was last updated
        'article_age_days': 'int32',         # Age of article in days
        'recency_score': 'float32',          # Calculated recency score (higher = more recent)

        # Content analysis
        'content_summary': 'object',         # Generated summary of article content
        'bullet_points': 'object',           # Key points extracted as bullets
        'article_length_chars': 'int32',     # Character count of article content

        # Rating flags (LLM-generated probabilities)
        'is_high_quality': 'float32',        # LLM probability for low-quality content
        'is_off_topic': 'float32',           # LLM probability for off-topic content
        'is_low_importance': 'float32',      # 1-LLM probability for high-importance content

        # Other ratings
        'bradley_terry_score': 'float32',    # Bradley-Terry rating from pairwise article comparisons
        'bradley_terry_rank': 'int32',       # Ordinal rank based on Bradley-Terry scores (1 = highest rated)
        'adjusted_length_score': 'float32',  # Length-adjusted quality score
        'final_composite_rating': 'float32', # Final weighted rating combining multiple factors

        # Topic classification
        'topic_string': 'object',            # Topic labels as comma-separated string
        'topic_list': 'object',              # Topic labels as list/array structure (same topics, different format)

        # Organization and clustering (HDBSCAN-based)
        'display_order': 'int32',            # Order for display/presentation
        'cluster_id': 'int32',               # HDBSCAN cluster identifier (-1 = noise/outlier)
        'cluster_label': 'category'          # Human-readable cluster name/description
    }

    # Create empty DataFrame from column dictionary
    df = pd.DataFrame(columns=list(column_dict.keys())).astype(column_dict)

    return df



In [None]:
@dataclass
class NewsletterState:
    """
    Maintains session state for the OpenAI Agents SDK workflow.

    Attributes:
        headline_df: DataFrame containing headline data for processing
        sources_file: Path to YAML file containing source configurations
        sources: Dictionary of source configurations loaded from YAML
        cluster_topics: List of clean topic names for headline categorization
        max_edits: Maximum number of critic optimizer editing iterations allowed
        edit_complete: Boolean flag indicating if editing process is finished
        n_browsers: Number of concurrent Playwright browser instances for downloads
    """

    status: WorkflowStatus = WorkflowStatus()
    headline_df: pd.DataFrame = field(default_factory=create_news_dataframe)
    sources_file: str = field(default="sources.yaml")
    sources: Dict[str, Any] = field(default_factory=dict)
    cluster_topics: List[str] = field(default_factory=list)
    max_edits: int = field(default=3)
    edit_complete: bool = field(default=False)
    n_browsers: int = field(default=8)
    verbose: bool = field(default=True)


    def __post_init__(self):
        """
        Post-initialization validation and setup.

        Validates that the configuration makes sense and performs
        any necessary initialization steps.
        """
        # Validate max_edits is reasonable
        if self.max_edits < 1 or self.max_edits > 10:
            raise ValueError(f"max_edits should be between 1-10, got {self.max_edits}")

        # Validate n_browsers is reasonable
        if self.n_browsers < 1 or self.n_browsers > 32:
            raise ValueError(f"n_browsers should be between 1-32, got {self.n_browsers}")

        # Validate sources_file exists and load sources from file automatically
        try:
            sources_path = Path(self.sources_file)
            with open(sources_path, 'r', encoding='utf-8') as file:
                self.sources = yaml.safe_load(file) or {}
            if self.verbose:
                print(f"Loaded {len(self.sources)} sources from {self.sources_file}")
        except FileNotFoundError:
            raise FileNotFoundError(f"Sources file not found: {self.sources_file}")
        except yaml.YAMLError as e:
            raise ValueError(f"Error parsing YAML file {self.sources_file}: {e}")


In [None]:
state = NewsletterState()
state


In [None]:
from agents import Agent, Runner, SQLiteSession, function_tool, RunContextWrapper


In [None]:
class NewsletterAgent(Agent[NewsletterState]):
    """AI newsletter writing agent with structured workflow"""

    def __init__(self, session_id: str = "newsletter_agent"):
        self.session = SQLiteSession(session_id, "newsletter.db")
        self.state = NewsletterState()

        super().__init__(
            name="AINewsletterAgent",
            instructions="""
            You are an AI newsletter writing agent. Your role is to:
            1. Scrape headlines and URLs from various sources
            2. Filter the headlines to ones that are about AI
            3. Fetch the URLs and save them as plain text
            4. Summarize each article to 3 bullet points containing the key facts
            5. Extract topics from each article and cluster articles by topic
            6. Rate each article according to the provided rubric
            7. Identify 6-15 thematic sections + "Other News", assign articles to sections and deduplicate
            8. Write each section
            9. Combine sections and polish

            Use the tools available to accomplish these tasks in order.
            Always maintain context about workflow progress and data.
            Guide users through the workflow steps systematically.
            """,
            tools=[
                self.step1_scrape_headlines,
                self.step2_filter_ai_headlines,
                self.step3_fetch_article_texts,
                self.step4_summarize_articles,
                self.step5_extract_and_cluster_topics,
                self.step6_rate_articles,
                self.step7_organize_sections,
                self.step8_write_sections,
                self.step9_finalize_newsletter,
                self.get_workflow_status,
                self.run_complete_workflow,
                self.reset_workflow
            ]
        )

    @function_tool
    async def step1_scrape_headlines(
        self,
        wrapper: RunContextWrapper[NewsletterState],
        sources: List[str] = None,
        max_articles_per_source: int = 50
    ) -> str:
        """Step 1: Scrape headlines and URLs from various sources"""
        if sources is None:
            sources = ["techcrunch", "arstechnica", "theverge", "wired", "venturebeat"]

        scraped_data = []

        # Mock scraping implementation (replace with real RSS/API scraping)
        for source in sources:
            for i in range(max_articles_per_source):
                article = {
                    'title': f"{source} AI Article {i+1}: Latest developments in machine learning",
                    'url': f"https://{source}.com/ai-article-{i+1}",
                    'source': source,
                    'published_at': (datetime.now() - timedelta(hours=i)).isoformat(),
                    'description': f"AI-related content from {source}"
                }
                scraped_data.append(article)

        wrapper.context.raw_headlines = scraped_data
        wrapper.context.scraped_urls = [article['url'] for article in scraped_data]
        wrapper.context.current_step = 1

        return f"‚úÖ Step 1 Complete: Scraped {len(scraped_data)} headlines from {len(sources)} sources"


    @function_tool
    async def step2_filter_ai_content(
        self,
        wrapper: RunContextWrapper[NewsletterState],
        ai_keywords: List[str] = None
    ) -> str:
        """Step 2: Filter headlines to AI-related content only"""
        if not wrapper.context.raw_headlines:
            return "‚ùå No headlines to filter. Run step 1 first."

        if ai_keywords is None:
            ai_keywords = [
                'ai', 'artificial intelligence', 'machine learning', 'deep learning',
                'neural network', 'llm', 'gpt', 'transformer', 'chatbot', 'automation',
                'computer vision', 'nlp', 'natural language', 'algorithm', 'model'
            ]

        ai_articles = []
        for article in wrapper.context.raw_headlines:
            title_lower = article['title'].lower()
            desc_lower = article['description'].lower()

            # Check if any AI keywords are present
            if any(keyword in title_lower or keyword in desc_lower for keyword in ai_keywords):
                ai_articles.append(article)

        wrapper.context.ai_headlines = pd.DataFrame(ai_articles)
        wrapper.context.current_step = 2

        return f"‚úÖ Step 2 Complete: Filtered to {len(ai_articles)} AI-related headlines from {len(wrapper.context.raw_headlines)} total"

    @function_tool
    async def step3_fetch_article_texts(
        self,
        wrapper: RunContextWrapper[NewsletterState]
    ) -> str:
        """Step 3: Fetch full article texts from URLs"""
        if wrapper.context.ai_headlines.empty:
            return "‚ùå No AI headlines to fetch. Complete steps 1-2 first."

        # Mock article fetching (replace with actual web scraping)
        article_texts = {}

        for _, row in wrapper.context.ai_headlines.iterrows():
            url = row['url']
            # Mock article content
            article_texts[url] = f"""
            {row['title']}

            This is a mock article about AI developments. In a real implementation,
            you would use libraries like requests + BeautifulSoup or newspaper3k
            to extract the full article text from the URL.

            Key points about this AI story:
            - Advancement in machine learning techniques
            - Impact on industry applications
            - Future implications for AI development

            This content would be much longer in practice, containing the full
            article text that needs to be summarized and analyzed.
            """

        wrapper.context.article_texts = article_texts
        wrapper.context.current_step = 3

        return f"‚úÖ Step 3 Complete: Fetched full text for {len(article_texts)} articles"

    @function_tool
    async def step4_summarize_articles(
        self,
        wrapper: RunContextWrapper[NewsletterState]
    ) -> str:
        """Step 4: Summarize each article to 3 key bullet points"""
        if not wrapper.context.article_texts:
            return "‚ùå No article texts to summarize. Complete steps 1-3 first."

        summaries = {}

        for url, text in wrapper.context.article_texts.items():
            # Mock summarization (replace with actual LLM summarization)
            summaries[url] = [
                "‚Ä¢ Key development in AI technology or research",
                "‚Ä¢ Practical implications for businesses or developers",
                "‚Ä¢ Future outlook or next steps in this area"
            ]

        wrapper.context.article_summaries = summaries
        wrapper.context.current_step = 4

        return f"‚úÖ Step 4 Complete: Generated 3-point summaries for {len(summaries)} articles"

    @function_tool
    async def step5_extract_and_cluster_topics(
        self,
        wrapper: RunContextWrapper[NewsletterState],
        max_clusters: int = 8
    ) -> str:
        """Step 5: Extract topics and cluster articles"""
        if not wrapper.context.article_texts:
            return "‚ùå No articles to analyze. Complete steps 1-4 first."

        # Extract topics from each article (mock implementation)
        article_topics = {}
        all_topics = []

        for url, text in wrapper.context.article_texts.items():
            # Mock topic extraction (replace with NLP)
            topics = ['machine learning', 'business applications', 'research', 'ethics']
            article_topics[url] = topics
            all_topics.extend(topics)

        # Cluster articles by common topics
        topic_counts = Counter(all_topics)
        main_topics = [topic for topic, count in topic_counts.most_common(max_clusters)]

        topic_clusters = {}
        for topic in main_topics:
            topic_clusters[topic] = [
                url for url, topics in article_topics.items()
                if topic in topics
            ]

        wrapper.context.article_topics = article_topics
        wrapper.context.topic_clusters = topic_clusters
        wrapper.context.current_step = 5

        return f"‚úÖ Step 5 Complete: Extracted topics and created {len(topic_clusters)} clusters"

    @function_tool
    async def step6_rate_articles(
        self,
        wrapper: RunContextWrapper[NewsletterState],
        custom_rubric: Dict[str, str] = None
    ) -> str:
        """Step 6: Rate articles according to rubric"""
        if not wrapper.context.article_texts:
            return "‚ùå No articles to rate. Complete previous steps first."

        if custom_rubric:
            wrapper.context.rating_rubric.update(custom_rubric)

        # Mock rating (replace with actual evaluation)
        ratings = {}
        for url in wrapper.context.article_texts.keys():
            # Mock scoring based on rubric criteria
            relevance_score = 0.8
            novelty_score = 0.7
            impact_score = 0.9
            credibility_score = 0.8

            overall_rating = (relevance_score + novelty_score + impact_score + credibility_score) / 4
            ratings[url] = overall_rating

        wrapper.context.article_ratings = ratings
        wrapper.context.current_step = 6

        avg_rating = sum(ratings.values()) / len(ratings)
        return f"‚úÖ Step 6 Complete: Rated {len(ratings)} articles. Average rating: {avg_rating:.2f}"

    @function_tool
    async def step7_organize_sections(
        self,
        wrapper: RunContextWrapper[NewsletterState],
        target_sections: int = 10
    ) -> str:
        """Step 7: Organize articles into thematic sections"""
        if not wrapper.context.topic_clusters:
            return "‚ùå No topic clusters available. Complete steps 1-6 first."

        # Create thematic sections based on clusters and ratings
        sections = {}

        # Main thematic sections from top clusters
        top_clusters = sorted(
            wrapper.context.topic_clusters.items(),
            key=lambda x: len(x[1]),  # Sort by cluster size
            reverse=True
        )[:target_sections-1]  # Reserve space for "Other News"

        for topic, urls in top_clusters:
            # Only include high-rated articles
            high_rated_urls = [
                url for url in urls
                if wrapper.context.article_ratings.get(url, 0) >= 0.6
            ]
            if high_rated_urls:
                section_name = topic.title().replace('_', ' ')
                sections[section_name] = high_rated_urls

        # "Other News" section for remaining articles
        assigned_urls = set()
        for urls in sections.values():
            assigned_urls.update(urls)

        other_urls = [
            url for url in wrapper.context.article_texts.keys()
            if url not in assigned_urls and wrapper.context.article_ratings.get(url, 0) >= 0.5
        ]

        if other_urls:
            sections["Other News"] = other_urls

        wrapper.context.thematic_sections = sections
        wrapper.context.section_names = list(sections.keys())
        wrapper.context.current_step = 7

        section_summary = "\n".join([
            f"‚Ä¢ {name}: {len(urls)} articles"
            for name, urls in sections.items()
        ])

        return f"‚úÖ Step 7 Complete: Organized into {len(sections)} sections:\n{section_summary}"

    @function_tool
    async def step8_write_sections(
        self,
        wrapper: RunContextWrapper[NewsletterState]
    ) -> str:
        """Step 8: Write content for each thematic section"""
        if not wrapper.context.thematic_sections:
            return "‚ùå No sections to write. Complete steps 1-7 first."

        section_drafts = {}

        for section_name, urls in wrapper.context.thematic_sections.items():
            # Gather content for this section
            section_articles = []

            for url in urls:
                summary = wrapper.context.article_summaries.get(url, [])
                rating = wrapper.context.article_ratings.get(url, 0)

                # Get article title from DataFrame
                article_row = wrapper.context.ai_headlines[
                    wrapper.context.ai_headlines['url'] == url
                ]
                title = article_row['title'].iloc[0] if not article_row.empty else "Unknown Title"

                section_articles.append({
                    'title': title,
                    'url': url,
                    'summary': summary,
                    'rating': rating
                })

            # Write section content (mock implementation)
            section_content = f"## {section_name}\n\n"

            for article in sorted(section_articles, key=lambda x: x['rating'], reverse=True):
                section_content += f"**{article['title']}**\n"
                for bullet in article['summary']:
                    section_content += f"{bullet}\n"
                section_content += f"[Read more]({article['url']})\n\n"

            section_drafts[section_name] = section_content

        wrapper.context.section_drafts = section_drafts
        wrapper.context.current_step = 8

        return f"‚úÖ Step 8 Complete: Wrote content for {len(section_drafts)} sections"

    @function_tool
    async def step9_finalize_newsletter(
        self,
        wrapper: RunContextWrapper[NewsletterState],
        newsletter_title: str = "AI Weekly Newsletter"
    ) -> str:
        """Step 9: Combine sections and polish final newsletter"""
        if not wrapper.context.section_drafts:
            return "‚ùå No section drafts available. Complete steps 1-8 first."

        # Combine all sections
        newsletter_content = f"# {newsletter_title}\n"
        newsletter_content += f"*Generated on {datetime.now().strftime('%B %d, %Y')}*\n\n"

        # Add introduction
        total_articles = len(wrapper.context.article_texts)
        newsletter_content += f"This week's AI newsletter covers {total_articles} key developments across {len(wrapper.context.section_drafts)} areas of AI.\n\n"

        # Add each section
        for section_name in wrapper.context.section_names:
            if section_name in wrapper.context.section_drafts:
                newsletter_content += wrapper.context.section_drafts[section_name]
                newsletter_content += "\n---\n\n"

        # Add footer
        newsletter_content += "*Thank you for reading! This newsletter was generated using AI curation and analysis.*"

        wrapper.context.final_newsletter = newsletter_content
        wrapper.context.workflow_complete = True
        wrapper.context.current_step = 9

        return f"‚úÖ Step 9 Complete: Finalized newsletter with {len(wrapper.context.section_drafts)} sections"

    @function_tool
    async def get_workflow_status(
        self,
        wrapper: RunContextWrapper[NewsletterState]
    ) -> str:
        """Get detailed workflow progress status"""
        state = wrapper.context

        status = {
            'current_step': state.current_step,
            'steps_completed': [
                f"1. Scraping: {len(state.raw_headlines)} headlines" if state.raw_headlines else "1. Scraping: Pending",
                f"2. AI Filtering: {len(state.ai_headlines)} AI articles" if not state.ai_headlines.empty else "2. AI Filtering: Pending",
                f"3. Text Fetching: {len(state.article_texts)} articles" if state.article_texts else "3. Text Fetching: Pending",
                f"4. Summarization: {len(state.article_summaries)} summaries" if state.article_summaries else "4. Summarization: Pending",
                f"5. Topic Clustering: {len(state.topic_clusters)} clusters" if state.topic_clusters else "5. Topic Clustering: Pending",
                f"6. Article Rating: {len(state.article_ratings)} rated" if state.article_ratings else "6. Article Rating: Pending",
                f"7. Section Organization: {len(state.thematic_sections)} sections" if state.thematic_sections else "7. Section Organization: Pending",
                f"8. Section Writing: {len(state.section_drafts)} drafts" if state.section_drafts else "8. Section Writing: Pending",
                f"9. Newsletter Finalization: {'Complete' if state.final_newsletter else 'Pending'}"
            ],
            'workflow_complete': state.workflow_complete
        }

        return f"Newsletter Workflow Status:\n\n" + "\n".join(status['steps_completed'])

    @function_tool
    async def run_complete_workflow(
        self,
        wrapper: RunContextWrapper[NewsletterState],
        sources: List[str] = None,
        ai_keywords: List[str] = None
    ) -> str:
        """Run the complete 9-step workflow automatically"""
        results = []

        # Execute each step in sequence
        result1 = await self.step1_scrape_headlines(wrapper, sources)
        results.append(result1)

        result2 = await self.step2_filter_ai_content(wrapper, ai_keywords)
        results.append(result2)

        result3 = await self.step3_fetch_article_texts(wrapper)
        results.append(result3)

        result4 = await self.step4_summarize_articles(wrapper)
        results.append(result4)

        result5 = await self.step5_extract_and_cluster_topics(wrapper)
        results.append(result5)

        result6 = await self.step6_rate_articles(wrapper)
        results.append(result6)

        result7 = await self.step7_organize_sections(wrapper)
        results.append(result7)

        result8 = await self.step8_write_sections(wrapper)
        results.append(result8)

        result9 = await self.step9_finalize_newsletter(wrapper)
        results.append(result9)

        newsletter_length = len(wrapper.context.final_newsletter)

        return "\n".join(results) + f"\n\nüéâ Complete workflow finished! Newsletter ready ({newsletter_length} characters)"

    @function_tool
    async def reset_workflow(
        self,
        wrapper: RunContextWrapper[NewsletterState]
    ) -> str:
        """Reset workflow to start fresh"""
        wrapper.context.__dict__.update(NewsletterState().__dict__)
        return "üîÑ Workflow reset. Ready to start step 1."

    @function_tool
    async def get_newsletter_preview(
        self,
        wrapper: RunContextWrapper[NewsletterState],
        max_chars: int = 500
    ) -> str:
        """Get a preview of the current newsletter"""
        if not wrapper.context.final_newsletter:
            return "Newsletter not ready yet. Complete the full workflow first."

        preview = wrapper.context.final_newsletter[:max_chars]
        if len(wrapper.context.final_newsletter) > max_chars:
            preview += "..."

        return f"Newsletter Preview:\n\n{preview}"

    async def run_step(self, user_input: str) -> str:
        """Run a workflow step with persistent state"""
        result = await Runner.run(
            self,
            user_input,
            session=self.session,
            context=self.state
        )
        return result.final_output

    def save_newsletter(self, filepath: str = None):
        """Save the final newsletter to file"""
        if not self.state.final_newsletter:
            print("No newsletter to save. Complete workflow first.")
            return

        if filepath is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filepath = f"ai_newsletter_{timestamp}.md"

        with open(filepath, 'w') as f:
            f.write(self.state.final_newsletter)

        print(f"Newsletter saved to {filepath}")




In [None]:
import openai

client = openai.OpenAI(
  base_url="http://localhost:8787/v1",
  api_key=os.getenv("OPENAI_API_KEY"),
  default_headers={"x-portkey-provider": "openai"}
)

response = client.chat.completions.create(
  model="gpt-4o-mini",
  messages=[{"role": "user", "content": "Hello"}]
)
print(response.choices[0].message.content)

In [None]:
from portkey_ai import Portkey

client = Portkey(
    provider="openai",
    Authorization=os.getenv("OPENAI_API_KEY")
)

# Example: Send a chat completion request
response = client.chat.completions.create(
    messages=[{"role": "user", "content": "Hello, how are you?"}],
    model="gpt-4o"
)

print(response.choices[0].message.content)

In [None]:
type(prompt_template)

In [None]:
class AgentState(TypedDict):
    """
    State of the LangGraph agent.
    Each node in the graph is a function that takes the current state and returns the updated state.
    """

    # the current working set of headlines (pandas dataframe not supported)
    AIdf: list[dict]
    # ignore stories before this date for deduplication (force reprocess since)
    model_low: str     # cheap fast model like gpt-4o-mini or flash
    model_medium: str  # medium model like gpt-4o or gemini-1.5-pro
    model_high: str    # slow expensive thinking model like o3-mini
    sources: dict  # sources to scrap
    sources_reverse: dict[str, str]  # map file names to sources

state = AgentState()


In [None]:
SOURCES_FILE = "sources.yaml"

def initialize(state, sources_file=SOURCES_FILE) -> Dict[str, Any]:
    """Read and parse the sources.yaml file."""
    try:
        with open(sources_file, 'r', encoding='utf-8') as file:
            state["sources"] =  yaml.safe_load(file)
        state["sources_reverse"] = {v["title"]+".html":k for k,v in state["sources"].items()}
    except FileNotFoundError:
        raise FileNotFoundError(f"Sources file '{self.sources_file}' not found")
    except yaml.YAMLError as e:
        raise ValueError(f"Error parsing YAML file: {e}")

    return state


In [None]:
state = initialize(state)
state
