In [64]:
import os
from kaggle_secrets import UserSecretsClient

try:
    GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    print("âœ… Gemini API key setup complete.")
except Exception as e:
    print(
        f"ðŸ”‘ Authentication Error: Please make sure you have added 'GOOGLE_API_KEY' to your Kaggle secrets. Details: {e}"
    )

âœ… Gemini API key setup complete.


In [65]:
! pip install pyalex beautifulsoup4 requests pypdf



In [66]:
import uuid 
import asyncio
import sys
import requests
import io
from typing import Any, Dict, List, Optional
from bs4 import BeautifulSoup
from pypdf import PdfReader

from google.adk.agents import LlmAgent, SequentialAgent
from google.adk.apps.app import App, ResumabilityConfig
from google.adk.models.google_llm import Gemini
from google.adk.sessions import DatabaseSessionService
from google.adk.runners import Runner
from google.adk.tools.tool_context import ToolContext
from google.genai import types

from google.adk.tools import FunctionTool
from pyalex import Works

print("âœ… ADK components imported successfully.")

âœ… ADK components imported successfully.


In [67]:
retry_config = types.HttpRetryOptions(
    attempts=5,  # Maximum retry attempts
    exp_base=2,  # Delay multiplier
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],  # Retry on these HTTP errors
)

In [68]:
gemini_flash = "gemini-2.5-flash"

In [69]:
db_url = "sqlite:///research_assistant_data.db"
session_service = DatabaseSessionService(db_url=db_url)
print(f"   - Database: research_assistant_data.db")

   - Database: research_assistant_data.db


research_agent tools

In [70]:
# --- TOOL 1: the "Scout" (Search & Stage) ---
def search_openalex(topic: str, tool_context: ToolContext) -> str:
    """Searches for open access papers and stages them for human review.

    This tool queries OpenAlex for peer-reviewed, open-access papers. It does NOT
    return the full papers immediately. Instead, it "stages" the top 20 results 
    into a temporary 'inbox' in the session state and returns a summary string.

    Args:
        topic: The scientific topic or keywords to search for.
        tool_context: (Injected by system) The context to access session state.

    Returns:
        String summary of the search results for the Agent to read.
        
        IMPORTANT: The Agent should immediately call 'review_next_paper' after receiving this.

        Example Success: 
            "Found and staged 20 papers (Batch 1234abcd). Please review them..."
        
        Example Error: 
            "No valid open-access papers found."
    """
    # CONTEXT RESET: generating new Event ID & Clearing the Inbox
    event_id = str(uuid.uuid4())[:8]  # unique ID for this search batch
    tool_context.session.state["inbox"] = [] # Clear leftover pending papers
    
    print(f"\n Searching OpenAlex for: '{topic}' (Batch ID: {event_id})...")

    # fetch Data (we are getting more than needed to allow filtering)
    results = (
        Works()
        .search(topic)
        .filter(is_oa=True, has_fulltext=True) 
        .get(per_page=20) 
    )

    staged_papers = [] 
    string_output = [] 

    for work in results:
        # if there is no title or abstract, skip
        title = work.get('title')
        if not title: continue
        
        try:
            abstract = work['abstract']
        except:
            abstract = None
        if not abstract: continue 
        
        # PDF link has priority over Landing Page link
        best_location = work.get('best_oa_location', {}) or {}
        pdf_url = best_location.get('pdf_url')
        landing_page = best_location.get('landing_page_url')
        final_link = pdf_url if pdf_url else landing_page
        
        if not final_link: continue

        # Extract Authors (List of strings)
        authors = [a.get('author', {}).get('display_name', 'Unknown') 
                  for a in work.get('authorships', [])]
        author_str = ", ".join(authors[:3]) # first 3 authors
        if len(authors) > 3: author_str += " et al."

        # the Database Schema
        paper_obj = {
            "id": work['id'],
            "event_id": event_id,      # batch ID for sorting later
            "title": title,
            "authors": author_str,
            "year": work['publication_year'],
            "link": final_link,
            "abstract": abstract,      # saving full abstract for the summarizer
            "summary": None,           # empty until the summary agent fills it
            "status": "pending"        # initial (default) status
        }
        staged_papers.append(paper_obj)
        
        # String for LLM
        string_output.append(
            f"ID: {work['id']}\nTitle: {title}\nAuthors: {author_str}\nLink: {final_link}\n---"
        )
        
        # Stop at 20 valid papers
        if len(staged_papers) >= 20:
            break
    
    # save to Session State (the "Inbox")
    tool_context.session.state["inbox"] = staged_papers
    
    if not staged_papers:
        return "No valid open-access papers found."
    
    return (f"Found and staged {len(staged_papers)} papers (Batch {event_id}). "
            f"Please review them using the review_next_paper tool.\n\n" + "\n".join(string_output))

# --- TOOL 2: the HITL loop
def review_next_paper(tool_context: ToolContext) -> dict:
    """Retrieves the next pending paper and pauses for human approval.

    This tool is designed to be called in a loop. It checks the 'inbox' for the
    next paper with status 'pending'. It then pauses execution.

    Args:
        tool_context: (Injected by system) The context to manage the HITL flow.

    Returns:
        Dictionary indicating process status (paused, processed, or complete).
        
        Example (First Call - Paused):
            {"status": "paused", "message": "Waiting for user approval..."}
            
        Example (Second Call - Resume):
            {"status": "processed", "decision": "Approved", "remaining": 15}
            
        Example (Completion):
            {"status": "complete", "message": "No pending papers in the inbox."}
    """
    
    # index space
    inbox = tool_context.session.state.get("inbox", [])
    
    # next pending paper
    target_index = -1
    target_paper = None
    
    for i, paper in enumerate(inbox):
        if paper['status'] == 'pending':
            target_index = i
            target_paper = paper
            break
            
    # base case for function, after all pending papers have been verified by the user
    if target_paper is None:
        return {
            "status": "complete",
            "message": "No pending papers in the inbox."
        }

    # HITL process, for the user to approve or reject the given papers
    if not tool_context.tool_confirmation:
        tool_context.request_confirmation(
            hint=(f"REVIEW ({target_paper['event_id']})\n"
                  f"Title: {target_paper['title']}\n"
                  f"Authors: {target_paper['authors']}\n"
                  f"Abstract: {target_paper['abstract']}...\n"
                  f"Link: {target_paper['link']}"),
            payload={"paper_id": target_paper['id']}
        )
        return {
            "status": "action_required",
            "message": "Pausing for user input..."
        }

    # resume, and update the archive
    if tool_context.tool_confirmation.confirmed:
        inbox[target_index]['status'] = 'approved'
        result_msg = "Approved"
        
        archive = tool_context.session.state.get("archive", [])
        archive.append(inbox[target_index])
        tool_context.session.state["archive"] = archive
        
    else:
        inbox[target_index]['status'] = 'rejected'
        result_msg = "Rejected"
        
        rejected = tool_context.session.state.get("rejected", [])
        rejected.append(inbox[target_index])
        tool_context.session.state["rejected"] = rejected


        
    # save the inbox state        
    tool_context.session.state["inbox"] = inbox

    # counting down 
    pending_count = sum(1 for p in inbox if p['status'] == 'pending')

    # result message for Agent
    return {
        "status": "processed",
        "title": target_paper['title'],
        "decision": result_msg,
        "remaining": pending_count,
        "message": f"Paper {result_msg}. {pending_count} papers remaining in inbox."
    }

print("--- Research Tools in place ---")

--- Research Tools in place ---


summary_agent tools

In [71]:
def fetch_approved_papers(tool_context: ToolContext) -> str:
    """Retrieves a list of papers from the archive that need summarizing.
    
    Looks for papers in the 'archive' list where 'summary' is None.

    Args:
        tool_context: (Injected by system) Access to the 'archive' in session state.

    Returns:
        String list of papers to process.

        Example Success:
            "ID: W123\nTitle: Agents in AI\nLink: http://...\n---"
        
        Example Empty:
            "No papers waiting for summary."
    """
    archive = tool_context.session.state.get("archive", [])
    to_process = []
    
    for paper in archive:
        if paper.get("summary") is None:
            to_process.append(
                f"ID: {paper['id']}\nTitle: {paper['title']}\nLink: {paper['link']}\n---"
            )
            
    if not to_process:
        return "No papers waiting for summary."
        
    return "\n".join(to_process)

def read_paper_content(url: str) -> str:
    """Downloads and extracts text content from a paper URL.

    Automatically detects if the URL is a PDF or HTML page and applies
    the appropriate extraction strategy. Text is truncated to 50k characters.

    Args:
        url: The direct link to the paper (pdf or web page).

    Returns:
        The raw text content of the paper.

        Example Success: 
            "Abstract: This paper explores... [content] ... References"
        
        Example Error: 
            "Error reading paper: 404 Client Error: Not Found"
    """
    print(f"=== Downloading content: {url[:60]} ===")
    try:
        headers = {'User-Agent': 'Mozilla/5.0 (Capstone Research Agent)'}
        response = requests.get(url, headers=headers, timeout=15)
        response.raise_for_status()
        
        content_type = response.headers.get('Content-Type', '').lower()
        
        # STRATEGY A: PDF handling
        if 'pdf' in content_type or url.endswith('.pdf'):
            with io.BytesIO(response.content) as f:
                reader = PdfReader(f)
                text = ""
                # Read first 10 pages max to save time/tokens
                for page in reader.pages[:10]: 
                    text += page.extract_text() + "\n"
                return text[:50000] # Truncate safety

        # STRATEGY B: HTML handling
        else:
            soup = BeautifulSoup(response.content, 'html.parser')
            # kill script and style elements
            for script in soup(["script", "style"]):
                script.decompose()
            text = soup.get_text()
            # break into lines and remove leading and trailing space on each
            lines = (line.strip() for line in text.splitlines())
            # put multi-headlines into a line each
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            # drop blanks
            text = '\n'.join(chunk for chunk in chunks if chunk)
            return text[:50000]

    except Exception as e:
        return f"Error reading paper: {str(e)}"

def save_summary(paper_id: str, summary_text: str, tool_context: ToolContext) -> str:
    """Saves the generated summary to the permanent archive in the database.

    Updates the specific paper record in the 'archive' list with the new summary.

    Args:
        paper_id: The ID of the paper being summarized.
        summary_text: The final summary generated by the agent.
        tool_context: Context to access session state.

    Returns:
        Status message.
        
        Example Success: "--- Summary saved for paper W123. ---"
        Example Error: "--- Error: Paper W123 not found in archive. ---"
    """
    archive = tool_context.session.state.get("archive", [])
    
    found = False
    for paper in archive:
        if paper['id'] == paper_id:
            paper['summary'] = summary_text
            found = True
            break
            
    if found:
        tool_context.session.state["archive"] = archive
        return f"--- Summary saved for paper {paper_id}. ---"
    else:
        return f"--- Error: Paper {paper_id} not found in archive. ---"


print("--- Summary Tools in place ---")

--- Summary Tools in place ---


In [72]:
# The Researcher: Finds and Reviews
research_agent = LlmAgent(
    model=Gemini(model=gemini_flash, retry_options=retry_config),
    name="ResearchAgent",
    description="""You are an academic researcher.
    1. First, search for papers on the user's topic using `search_openalex`.
    2. Then, IMMEDIATELY loop through the results using `review_next_paper`.
    3. Continue calling `review_next_paper` until it tells you there are no pending papers left.
    """,
    tools=[FunctionTool(search_openalex), FunctionTool(review_next_paper)],
    output_key="research_findings",
)

print("--- Research Agent set! ---")

--- Research Agent set! ---


In [73]:
summary_agent = LlmAgent(
    model=Gemini(model=gemini_flash, retry_options=retry_config),
    name="SummaryAgent",
    description="""You are a scientific writer.
    1. Call `fetch_approved_papers` to see what work is waiting.
    2. For EACH paper in the list:
       a. Call `read_paper_content(url)` to get the text.
       b. Generate a concise academic summary (highlighting methodology and results).
       c. Call `save_summary(id, summary)` to save your work.
    3. Finally, produce a compiled report of all the summaries you just generated.
    """,
    tools=[
        FunctionTool(fetch_approved_papers), 
        FunctionTool(read_paper_content), 
        FunctionTool(save_summary)
    ],
    output_key="final_report" 
)

print("--- Summary Agent ready! ---")

--- Summary Agent ready! ---


In [74]:
root_agent = SequentialAgent(
    name="RootAgent",
    description="Orchestrates the research and summary process.",
    sub_agents=[research_agent, summary_agent]
)

print("--- Agents Ready! ---")

--- Agents Ready! ---


In [75]:
# 1. The App Container
# This makes the agent logic portable and resumable
research_app = App(
    name="ResearchAssistantApp",
    root_agent=root_agent,
    resumability_config=ResumabilityConfig(is_resumable=True),

)

# 2. The Runner
# This connects the App to the Database (Session Service)
runner = Runner(
    app=research_app,
    session_service=session_service,
)

print("--- Runner Configured! ---")

--- Runner Configured! ---


  resumability_config=ResumabilityConfig(is_resumable=True),


In [76]:
def _check_for_approval_request(event) -> Optional[str]:
    """Helper to find the approval ID from the event structure (Course Logic)."""
    if event.content and event.content.parts:
        for part in event.content.parts:
            # Check for the specific ADK internal function call used for confirmation
            if part.function_call and part.function_call.name == "adk_request_confirmation":
                # Return the ID of this confirmation request
                return part.function_call.id
    return None

def _get_agent_text(event) -> str:
    """Helper to extract clean text from agent response."""
    if event.content and event.content.parts:
        parts_to_show = []
        for part in event.content.parts:
            # Filter out thought traces and None
            if hasattr(part, 'text') and part.text and "thought_signature" not in part.text:
                parts_to_show.append(part.text)
        return " ".join(parts_to_show)
    return ""

print("--- HITL Helper functions on ---")

--- HITL Helper functions on ---


In [77]:
async def run_interactive_session(
    runner_instance: Runner, 
    topic: str,
    session_name: str = "default-session",
    user_id: str = "user_researcher"
):
    print(f"\nðŸš€ Starting Session: {session_name} (User: {user_id})")
    
    # Create or Get Session
    try:
        session = await session_service.create_session(
            app_name=runner_instance.app.name,
            user_id=user_id,
            session_id=session_name
        )
    except:
        session = await session_service.get_session(
            app_name=runner_instance.app.name,
            user_id=user_id,
            session_id=session_name
        )

    # prep Input
    user_msg = types.Content(role="user", parts=[types.Part(text=f"Research this topic: {topic}")])
    
    # event Loop
    async for event in runner_instance.run_async(
        session_id=session.id,
        user_id=user_id, 
        new_message=user_msg
    ):
        # A. Agent Output
        text = _get_agent_text(event)
        if text:
            print(f"\nðŸ¤– Agent: {text}")

        # B. HITL request (should pause, to be resumable)
        approval_id = _check_for_approval_request(event)
        
        if approval_id:
            print(f"\nâœ‹ PAUSE REQUIRED (ID: {approval_id})")
            print("-" * 40)
            print("The Agent is requesting approval for the item above.")
            print("-" * 40)
            
            while True:
                ans = input("   ðŸ‘‰ Approve this paper? (y/n): ").strip().lower()
                if ans in ['y', 'n']:
                    break
            
            is_confirmed = (ans == 'y')
            
            await runner_instance.submit_confirmation(
                session_id=session.id,
                confirmation_id=approval_id,
                confirmed=is_confirmed
            )

In [78]:
topic = "I want to write a paper on humanoid robotics, comparing their design to other robots"

In [79]:
await run_interactive_session(runner, topic, session_name='my-notebook-session')

ERROR:asyncio:Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7e5e3c00d890>
ERROR:asyncio:Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7e5e3bd09150>



ðŸš€ Starting Session: my-notebook-session (User: user_researcher)


  agent_state = SequentialAgentState(current_sub_agent=sub_agent.name)
  return orig_init(self, *args, **kwargs)



ðŸ¤– Agent: I am currently paused and waiting for your explicit approval or disapproval for the paper titled "Measurement Instruments for the Anthropomorphism, Animacy, Likeability, Perceived Intelligence, and Perceived Safety of Robots" by Christoph Bartneck, Dana KuliÄ‡, Elizabeth A. Croft et al. (ID: https://openalex.org/W2032568497).

Please state either "Yes, I approve" or "No, I do not approve" for this paper so I can continue with the research. I cannot proceed with any new research until this is addressed.





ðŸ¤– Agent: I'm the SummaryAgent, and my role is to summarize papers *after* they have been approved. The ResearchAgent is currently paused and waiting for your explicit approval or disapproval for the paper titled "Measurement Instruments for the Anthropomorphism, Animacy, Likeability, Perceived Intelligence, and Perceived Safety of Robots."

Please say "Yes, I approve" or "No, I do not approve" for that specific paper so the ResearchAgent can move forward. I cannot begin my task of summarizing until papers are approved.
