In [12]:
pip install google-adk google-genai ddgs

Note: you may need to restart the kernel to use updated packages.


In [13]:
import asyncio
import os
import traceback
import json
import requests
import base64
import time
from typing import Any, Dict, List, Optional

from kaggle_secrets import UserSecretsClient

from google.adk.agents import LlmAgent, SequentialAgent, ParallelAgent
from google.adk.models import Gemini
from google.adk.runners import Runner, RunConfig
from google.adk.sessions import DatabaseSessionService
from google.adk.tools.tool_context import ToolContext
from google.genai import types as genai_types
from google.genai import Client

APP_NAME = "dataforseo_jobs_pipeline"
MAX_TOTAL_LLM_CALLS = 10


In [14]:
try:

    GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    print("âœ… Gemini API key loaded from Kaggle Secrets.")
except Exception as e:
    print(f"ðŸ”‘ Authentication Error: Please make sure 'GOOGLE_API_KEY' is added to your Kaggle secrets. Details: {e}")

âœ… Gemini API key loaded from Kaggle Secrets.


In [15]:
def load_dataforseo_credentials() -> tuple[str, str]:
    """Loads DataForSEO credentials securely from Kaggle Secrets."""
    client = UserSecretsClient()
    # Ensure these names match your Kaggle Secret variable names exactly
    login = client.get_secret("DFSEO_LOGIN") 
    password = client.get_secret("DFSEO_PASSWORD")
    return login, password

def create_dataforseo_auth_header(login: str, password: str) -> Dict[str, str]:
    """Creates the Basic Authentication header required by DataForSEO."""
    auth_string = f"{login}:{password}".encode("utf-8")
    base64_auth_string = base64.b64encode(auth_string).decode("utf-8")
    
    return {
        'Authorization': f'Basic {base64_auth_string}',
        'Content-Type': 'application/json'
    }

In [16]:
retry_config = genai_types.HttpRetryOptions(attempts=4, initial_delay=1.0, max_delay=10.0, exp_base=2.0)

In [17]:
def make_gemini_llm() -> Gemini:
    try:
        client = Client()
    except Exception as e:
        raise ValueError(f"Could not initialize Google GenAI Client. Is GOOGLE_API_KEY set correctly? Original error: {e}")

    return Gemini(
        client=client,
        model="gemini-2.5-flash",
        retry_options=retry_config
    )

In [18]:
def submit_google_jobs_task(search_keyword: str, location_name: str ) -> str:
    """
    (POST) Submits a job search task to the Google Jobs API with the 'today' filter.
    Returns the JSON response containing the 'task_id'.
    """
    login, password = load_dataforseo_credentials()
    if not login or not password:
        return "ERROR: DataForSEO credentials not loaded. Check Kaggle Secrets."

    endpoint_url = "https://api.dataforseo.com/v3/serp/google/jobs/task_post"
    headers = create_dataforseo_auth_header(login, password)
    
    post_data = [
        {
            "language_name": "English",
            "location_code": 2840, 
            "keyword": search_keyword,
            "date_posted":"today"
        }
    ]

    try:
        response = requests.post(endpoint_url, headers=headers, data=json.dumps(post_data), timeout=30)
        response.raise_for_status() 
        return response.text 
        
    except requests.exceptions.RequestException as e:
        return f"ERROR: Task Submission Failed: {e}"


def retrieve_google_jobs_results(task_id: str) -> str:
    """
    (GET) Retrieves the completed Google Jobs search results using the Task ID.
    """
    print(f"Waiting 15 seconds to allow DataForSEO to process Task ID: {task_id}...")
    time.sleep(45) 

    login, password = load_dataforseo_credentials()
    if not login or not password:
        return "ERROR: DataForSEO credentials not loaded. Check Kaggle Secrets."
        
    endpoint_url = f"https://api.dataforseo.com/v3/serp/google/jobs/task_get/advanced/{task_id}"
    headers = create_dataforseo_auth_header(login, password)

    try:
        response = requests.get(endpoint_url, headers=headers, timeout=30)
        response.raise_for_status() 
        return response.text 
        
    except requests.exceptions.RequestException as e:
        return f"ERROR: Task Retrieval Failed: {e}"


import pandas as pd
import json

def export_to_csv(json_data_string: str, filename: str = "job_listings.csv") -> str:
    """
    Converts a JSON array string into a pandas DataFrame and saves it as a CSV file.
    """
    try:
        data_list = json.loads(json_data_string)

        df = pd.DataFrame(data_list)
        
        df.to_csv(filename, index=False)

        return f"SUCCESS: Job data saved to CSV file: {filename}"
        
    except json.JSONDecodeError:
        return "ERROR: Input data was not valid JSON. Cannot export to CSV."
    except Exception as e:
        return f"ERROR: Failed to save CSV: {e}"

In [19]:
submit_agent = LlmAgent(
    name="submit_data_task_agent",
    description="Submits the Google Jobs search query and saves the Task ID for retrieval.",
    model=make_gemini_llm(),
    instruction="""
    Call the `submit_google_jobs_task` tool with the keyword 'Software Engineer'.
    The tool returns a JSON response containing the task submission status.
    You must extract the 'id' of the first task from the 'tasks' array (tasks[0]['id']) and output the entire response in proper json format.
    """,
    tools=[submit_google_jobs_task]  
)

In [20]:
retrieve_agent = LlmAgent(
    name="retrieve_data_agent",
    description="Retrieves the final job list JSON using the Task ID provided by the previous step.",
    model=make_gemini_llm(),
    instruction="""
    The previous step outputted the Task ID. Call the `retrieve_google_jobs_results` tool using that Task ID as the argument. 
    Output the raw JSON response from the retrieval tool.
    """,
    tools=[retrieve_google_jobs_results]
)

In [21]:
format_agent = LlmAgent(
    name="format_agent",
    description="Takes the raw API JSON, extracts clean job items, and formats them into the required structure.",
    model=make_gemini_llm(),
    instruction="""
    The previous step provided a large JSON response from the Google Jobs API. 
    Find the final job results list inside the 'tasks[0].result[0].items' key.
    Extract the job listings and format them into a single, clean JSON list of dictionaries, where each dictionary contains (title, employer_name, location, source_url, time_ago).
    Output **ONLY** the final JSON list.
    """,
    tools=[]
)

In [22]:
export_agent = LlmAgent(
    name="export_agent",
    description="Saves the final, clean JSON list of jobs to a CSV file.",
    model=make_gemini_llm(),
    instruction="""
    The previous agent's output is the final, clean JSON list of job dictionaries.
    Call the `export_to_csv` tool exactly once, passing the entire JSON list as the first argument 
    and the filename 'final_software_engineer_jobs.csv' as the second argument.
    Output ONLY the status message returned by the tool.
    """,
    tools=[export_to_csv] # Add the new tool here
)

In [23]:
root_orchestrator = SequentialAgent(
    name="root_orchestrator",
    description="Main Workflow: Submit search task, retrieve results, and format final JSON.",
    sub_agents=[
        submit_agent,
        retrieve_agent,
        format_agent,
        export_agent
    ]
)

#root_orchestrator=submit_agent

session_service = DatabaseSessionService(db_url="sqlite:///:memory:")
runner = Runner(agent=root_orchestrator, app_name=APP_NAME, session_service=session_service)

In [24]:
async def run_job_pipeline_async():
    print("ðŸš€ Starting DataForSEO Job Pipeline...")
    
    session_id = "dataforseo_test_session"
    user_id = "dfseo_test_user"

    await session_service.create_session(app_name=APP_NAME, user_id=user_id, session_id=session_id)

    user_msg = genai_types.Content(
        role="user",
        parts=[genai_types.Part(text="Find recent software engineer jobs in San Francisco and return the JSON output.")]
    )
    
    agen = runner.run_async(
        user_id=user_id,
        session_id=session_id,
        new_message=user_msg,
        run_config=RunConfig(max_llm_calls=MAX_TOTAL_LLM_CALLS)
    )

    final_response = None
    async for event in agen:
        if event.is_final_response() and event.content:
             final_response = event.content.parts[0].text
             
    if final_response:
        print("\nðŸ¤– FINAL RAW JSON OUTPUT:")
        print(final_response)
        

await run_job_pipeline_async()

ðŸš€ Starting DataForSEO Job Pipeline...




Waiting 15 seconds to allow DataForSEO to process Task ID: 12100310-1264-0447-0000-36629464bd78...





ðŸ¤– FINAL RAW JSON OUTPUT:
SUCCESS: Job data saved to CSV file: final_software_engineer_jobs.csv
