# Example DeepResearchAgent

In [3]:
import os
import time
from os.path import join, exists
from os import listdir, makedirs
from datetime import datetime
from google import genai
from google.genai import types
from openai import OpenAI
import requests
import json
from pydantic import BaseModel, Field
from crawl4ai import *
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.gemini import GeminiModel
from dataclasses import dataclass

import asyncio
import nest_asyncio 
# Add this line to allow nested event loops
nest_asyncio.apply()

### Perplexity Search API Function

In [52]:

def get_perplexity_search_results(query):
    api_key = os.environ.get("PERPLEXITY_API_KEY")

    if not api_key:
        print("PERPLEXITY_API_KEY not found in environment variables.")

    messages = [
        {
            "role": "system",
            "content": (
                "You are an artificial intelligence assistant and you need to "
                "engage in a helpful, detailed, polite conversation with a user."
            ),
        },
        {   
            "role": "user",
            "content": (
                query
            ),
        },
    ]

    client = OpenAI(api_key=api_key, base_url="https://api.perplexity.ai")

    # chat completion without streaming
    response = client.chat.completions.create(
        model="sonar-pro",
        messages=messages,
    )

    message = response.choices[0].message.content + "\n\n"
    citations = response.citations

    for k, citation in enumerate(citations):
        message += f"[{k+1}] {citation}\n"
        #print(f"[{k+1}] {citation}")

    return message, citations


message, citations = get_perplexity_search_results("How many user queries can be done with the H100 und a 13B parameter LLM model?")
print(message)
print(citations)

### Google Search API Function

The API key is stored in the environment variable `SERPER_API_KEY`.  
An account can be created [here](https://serper.dev/).


In [61]:
def get_google_search_results(query, num_results=10):
    api_key = os.environ.get("SERPER_API_KEY")

    if not api_key:
        print("SERPER_API_KEY not found in environment variables.")
        return

    url = "https://google.serper.dev/search"
    payload = json.dumps({
    "q": query,
    "num": num_results
    })
    headers = {
    'X-API-KEY': api_key,
    'Content-Type': 'application/json'
    }

    response = requests.request("POST", url, headers=headers, data=payload)

    return response
    
topic = "Test-time compute and test-time training for Large Language Models."
response = get_google_search_results(topic)
# Convert to JSON
json_response = response.json()
print(json_response)

### Crawl Webpages

In [55]:
async def crawl_website_async(url_webpage):
    async with AsyncWebCrawler() as crawler:
        result = await crawler.arun(
            url=url_webpage,
        )
        return result.markdown

def crawl_website(url_webpage):
    return asyncio.run(crawl_website_async(url_webpage))

In [58]:
res = crawl_website("https://arxiv.org/html/2501.12895")

[INIT].... → Crawl4AI 0.4.247
[FETCH]... ↓ https://arxiv.org/html/2501.12895... | Status: True | Time: 1.77s
[SCRAPE].. ◆ Processed https://arxiv.org/html/2501.12895... | Time: 300ms
[COMPLETE] ● https://arxiv.org/html/2501.12895... | Status: True | Total: 2.07s


In [None]:
query_folder = "test-time-compute"
# Create folder if not exists
if not exists(query_folder):
    makedirs(query_folder)

document_data = {}

for result in json_response['organic']:
    title = result['title']
    markdown = crawl_website(result['link'])
    filename = result['title'] + ".md"

    document_data[title] = {
        'topic': topic,
        'link': result['link'],
        'snippet': result['snippet'],
        'date': result['date'],
        'position': result['position'],
        'markdown': markdown,
        'filename': filename
    }

    with open(join(query_folder, filename), "w") as f:
        f.write(markdown)

In [67]:
# Only run this block for Gemini Developer API
client = genai.Client()

flash_thinking_model = "gemini-2.0-flash-thinking-exp-01-21"
flash2_model = "gemini-2.0-flash-exp"
flash1_model = "gemini-1.5-flash"

model = GeminiModel(flash1_model)

### Document Quality Assessment

In [28]:
class DocumentQuality(BaseModel):
    filename: str = Field(description="The name of the file")   
    document_length: int = Field(description="The length of the document (0 to 10). Is the document short or long.", ge=0, le=10)
    relevance: int = Field(description="How relevant is the document with the main topic (0 to 10).", ge=0, le=10)
    document_quality: int = Field(description="Guess the quality of the document (0 to 10).", ge=0, le=10)
    document_age: int = Field(description="The age of the document relative to the current data (0 to 10).", ge=0, le=10)
    additional_observations: str = Field(description="If you noticed something strange about the document. Write it in form of an instruction for another LLM agents.")

def rate_document(document):

    system_instruction = f"""
    You are an professional scientific journalist. 

    You will receive research related documents (markdown format). 
    Your goal is to estimate the relevance and quality of this document (based on a given topic).
    The document will later be used for writing a reasearch report/document.
    If the quality of the text isn't good, this will lead to an overall bad outcome of the report. 

    Topic: {document['topic']}    
    Current Date: {datetime.now().date()}
    Document Date: {document['date']}
    Link: {document['link']}
    """

    markdown_content = markdown

    response = client.models.generate_content(
        model=model_name,
        contents=markdown_content,
        config=types.GenerateContentConfig(
            response_mime_type="application/json",
            response_schema=DocumentQuality,
            system_instruction=system_instruction,
            temperature=0.3,
        ),
    )
    document['response'] = response
    return document

### Process Files (Quality Assessment)

In [None]:
# Create folder if not exists
if exists(query_folder):
    # Get all files in folder
    files = listdir(query_folder)

for title, document in document_data.items():
    print(f"File: {document.filename}")
    document = rate_document(document)
    print(f"Reponse Text: {response.text}")
    document_data[title] = document
    time.sleep(5) # sleep for 5 seconds (rate limit is 10 RPM)

In [None]:
system_instruction = """You are an AI assistant designed to produce output that is visually appealing and easily readable in a terminal. When formatting your responses, utilize the syntax of the Python `rich` library. This involves using square brackets to enclose formatting tags.
        Here are some examples of how to apply formatting:

        * **Emphasis:** Instead of "This is important", output "[bold]This is important[/]".
        * **Headers/Titles:** Instead of "Section Title:", output "[bold blue]Section Title:[/]".
        * **Warnings:** Instead of "Warning!", output "[bold red]Warning![/]".
        * **Success Messages:** Instead of "Operation successful.", output "[green]Operation successful.[/]".
        * **Lists:** You can use colors for list items like "[cyan]*[/] Item 1".

        Always use the `rich` library's syntax for formatting terminal output to enhance readability."""

response = client.models.generate_content(
    model=flash_thinking_model,
    contents="Show me the proof for the euler identity?",
    config=types.GenerateContentConfig(
        system_instruction=system_instruction,
        temperature=0.3,
    ),
)
print(response.text)

### File Upload

In [None]:
file = client.files.upload(path="a11.text")
response = client.models.generate_content(
    model=model_name, contents=["Summarize this file", file]
)
print(response.text)

In [26]:
def get_current_weather(location: str) -> str:
    """Returns the current weather.

    Args:
      location: The city and state, e.g. San Francisco, CA
    """
    return "sunny"


response = client.models.generate_content(
    model="gemini-2.0-flash-exp",
    contents="What is the weather like in Boston?",
    config=types.GenerateContentConfig(tools=[get_current_weather]),
)

print(response.text)

  Expected `enum` but got `str` with value `'STRING'` - serialized value may not be as expected
  return self.__pydantic_serializer__.to_python(


The weather in Boston is sunny.



### Use Pydantic

In [34]:
class CountryInfo(BaseModel):
    name: str
    population: int
    capital: str
    continent: str
    gdp: int
    official_language: str
    total_area_sq_mi: int


response = client.models.generate_content(
    model="gemini-2.0-flash-exp",
    contents="Give me information for the United States.",
    config=types.GenerateContentConfig(
        response_mime_type="application/json",
        response_schema=CountryInfo,
    ),
)
print(response.text)

{
"capital": "Washington, D.C.",
"continent": "North America",
"gdp": 25460000000000,
"name": "United States",
"official_language": "English",
"population": 331002651,
"total_area_sq_mi": 3796742
}


### Check if URL is a PDF

In [112]:
import requests

def is_pdf_url(url, timeout=5):
    try:
        # Try HEAD first to avoid downloading content
        response = requests.head(url, allow_redirects=True, timeout=timeout)

        # Fallback to GET if HEAD not allowed
        if response.status_code == 405:
            response = requests.get(url, stream=True, timeout=timeout)

        content_type = response.headers.get('Content-Type', '').lower()
        return 'application/pdf' in content_type

    except requests.exceptions.RequestException:
        return False

# Example usage
print(is_pdf_url("https://arxiv.org/pdf/2501.12895"))  # True for PDFs

True


### Use PapersWithCode API

In [None]:
import requests

url = "https://paperswithcode.com/api/v1/search/"
params = {
    "page": 1,
    "items_per_page": 200,
    "q": "test-time compute"  # Space will be auto-encoded to "%20"
}
headers = {
    "accept": "application/json",
    "X-CSRFToken": "2ix1PR0FtUWIW5ePo08I3vhgHsvJ6fpqj0x1Ijjo4egxiofnUBzkX67bnHwbNd8G"
}

try:
    # Send GET request
    response = requests.get(url, params=params, headers=headers)
    response.raise_for_status()  # Raise exception for HTTP errors (e.g., 4xx/5xx)
    
    # Parse JSON response
    data = response.json()
    print("API Response:")
    print(data)

except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")
except ValueError as e:
    print(f"Failed to parse JSON: {e}")

In [None]:
from pprint import pprint
pprint(data)
type(data)

In [14]:
data['results'][0]['paper'].keys()

dict_keys(['id', 'arxiv_id', 'nips_id', 'url_abs', 'url_pdf', 'title', 'abstract', 'authors', 'published', 'conference', 'conference_url_abs', 'conference_url_pdf', 'proceeding'])