- simple test for integrating agents as a function in a hybrid workflow
- current set-up checks for a present key and generates one if missing by calling an agent
- in production this would be writing custom parsers for the result of a specific web scrape. 
- a function write_parser and check_parser would have to cooperate to get to an end result

In [1]:
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from parser_agent.agents import key_agent

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
session_service = InMemorySessionService()

session = await session_service.create_session(
    app_name='hybrid_workflow_integration_test',
    user_id='job',
    session_id='first_test'
)

runner = Runner(agent=key_agent,
                app_name='hybrid_workflow_integration_test',
                session_service=session_service)

In [None]:
# Prepare the user's message in ADK format
content = types.Content(role='user', parts=[types.Part(text='generate a single key')])

async def key_agent(content):
    async for event in runner.run_async(user_id='job',session_id='first_test',new_message=content):
        #print(f"  [Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}")

        final_response_text = 'error'
        # Key Concept: is_final_response() marks the concluding message for the turn.
        if event.is_final_response():
            if event.content and event.content.parts:
                # Assuming text response in the first part
                final_response_text = event.content.parts[0].text
            elif event.actions and event.actions.escalate: # Handle potential errors/escalations
                final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
            return int(final_response_text)
        

# turn off the stupid warning with logging

In [24]:
result = await key_agent(content)
result



223

In [26]:
jobs = {
    "job1":{
        "name":"test_job_1",
        "key":6348
    },
    
    "job2":{
        "name":"test_job_2",
        "key":None
    }
}

In [27]:
for job_name, job in jobs.items():
    print(f"Running job for: {job_name}")
    print(f"Job key is: {job['key']}")

    if job['key'] is not None:
        print(f"Key found for {job_name}")
        print(job['key'])
    
    else:
        print(f"No key found for {job_name}")
        print("Generating key...")
        key = await key_agent(content)
        print(f"Key: {key}")
        job['key'] = key
        print("Key stored")

Running job for: job1
Job key is: 6348
Key found for job1
6348
Running job for: job2
Job key is: None
No key found for job2
Generating key...




Key: 563
Key stored


In [28]:
jobs

{'job1': {'name': 'test_job_1', 'key': 6348},
 'job2': {'name': 'test_job_2', 'key': 563}}