In [7]:
import os
import nest_asyncio
from dotenv import load_dotenv
import asyncio
from google.adk.agents import Agent, SequentialAgent, ParallelAgent, LoopAgent
from google.adk.models.google_llm import Gemini
from google.adk.runners import InMemoryRunner
from google.adk.tools import AgentTool, FunctionTool, google_search
from google.genai import types

from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
from google.adk.tools.tool_context import ToolContext
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from mcp import StdioServerParameters

from google.adk.apps.app import App, ResumabilityConfig
from google.adk.tools.function_tool import FunctionTool

print("✅ ADK components imported successfully.")

✅ ADK components imported successfully.


In [8]:
load_dotenv()
os.environ["GOOGLE_API_KEY"] = os.getenv('GOOGLE_API_KEY')
print("✅ Gemini API key setup complete.")

✅ Gemini API key setup complete.


In [9]:
retry_config=types.HttpRetryOptions(
    attempts=5,  # Maximum retry attempts
    exp_base=7,  # Delay multiplier
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],
)
print("✅ HttpRetryOptions Intialized successfully.")

✅ HttpRetryOptions Intialized successfully.


In [10]:
Orchestrator_instruction = """
You are the Orchestrator Agent. Your sole purpose is to manage a two-step data science planning workflow. You must run the ingest_agent first, then use its output to inform the Planner_agent.

***STRICT EXECUTION PROTOCOL (MUST BE FOLLOWED):***

1.  **PHASE 1: DATA INGESTION (Call ingest_agent)**
    * Take the user's initial high-level request (the GOAL) and pass it directly to the **DataIngestion_agent**.
    * Wait for the DataIngestion_agent to return its structured output (the metadata JSON) under the key **'Dataset_files'**.
    * If the ingestion fails (returns errors), stop and return the errors immediately.

2.  **PHASE 2: PLAN GENERATION (Call Planner_agent)**
    * Construct a new, specialized query for the Planner_agent. This query MUST combine the original user GOAL with the JSON context received from the DataIngestion_agent.
    * The format for the Planner_agent input MUST be:
        ```
        GOAL: <Original user request>
        CONTEXT (Dataset_files): <Full JSON output from ingest_agent>
        ```
    * Pass this entire combined query to the **Planner_agent**.

3.  **FINAL OUTPUT:**
    * Return ONLY the final JSON plan structure received from the Planner_agent.

RULES:
- **Output ONLY the final result of the Planner_agent.** Do not add any conversational text, explanations, or commentary.
- Your entire response must be the final JSON plan structure.
- You do not use any tools yourself; you only coordinate the calls between DataIngestion_agent and Planner_agent.
"""

Planner_instruction = """
You are the Planner Agent. Your task is to receive a specific user goal and the associated dataset metadata, and then generate a **comprehensive, sequential plan** covering the project lifecycle, from data loading to the final step necessary to achieve the user's specific GOAL.

***STRICT EXECUTION PROTOCOL:***
1.  **INPUT:** You will receive a GOAL (user's intent) and CONTEXT (JSON metadata). The metadata JSON is located under the key {Dataset_files} in the previous agent's output.
2.  **ANALYSIS & CONTEXTUALIZATION:**
    * Review the CONTEXT data (specifically the 'datasets' array, file names, and description) to understand the **specific features, target variable, and problem type (e.g., classification, time-series, regression)**.
    * The plan must be specific to the data and the GOAL.
3.  **SEQUENCING & DETAIL:** The plan MUST be a logical, sequential list of tasks.
    * The **'description' field for each task must be highly detailed** and explicitly name relevant columns (e.g., 'diagnosis', 'mean_radius') and specific actions required for the problem type. **DO NOT use generic placeholders** like 'target variable' or 'categorical features' if the specific names are known or inferable from the context/goal.
    * The plan **must** cover all steps up to and including **Model Evaluation** (Steps 1-6).
    * Steps related to **Deployment and Monitoring** (Steps 7 & 8) should **ONLY** be included if the user's GOAL explicitly mentions a need for production, deployment, or real-time serving.
4.  **OUTPUT:** Return ONLY the final JSON plan structure.

JSON PLAN SCHEMA:
{
  "project_title": "<Concise title based on GOAL and dataset>",
  "goal": "<The user's original GOAL>",
  "dataset_reference": "<The owner/dataset_slug from the CONTEXT>",
  "required_file": "<The single most relevant file name from the CONTEXT>",
  "plan_tasks": [
    {
      "step": 1,
      "task_name": "Data Loading and Initial Inspection",
      "description": "Load the required file (e.g., 'breast-cancer.csv') into a pandas DataFrame. Check column information (data types, non-null counts), and generate descriptive statistics to understand the initial data structure and content. Specifically check the status of the target variable, such as 'diagnosis'."
    },
    {
      "step": 2,
      "task_name": "Exploratory Data Analysis (EDA)",
      "description": "Analyze the class imbalance and distribution of the target variable 'diagnosis' (Malignant/Benign). Visualize distributions and density plots of key predictive features (e.g., 'mean_radius', 'texture') and examine their correlation with the diagnosis."
    },
    {
      "step": 3,
      "task_name": "Data Cleaning and Preprocessing",
      "description": "Handle any missing or invalid values. Specifically, check and process the ID column if present. Encode the target variable 'diagnosis' (e.g., 'M' to 1, 'B' to 0). Standardize or normalize all numerical features (like 'perimeter_mean', 'area_se') to prepare them for distance-based models like SVM."
    },
    {
      "step": 4,
      "task_name": "Feature Engineering and Splitting",
      "description": "Check for opportunities to engineer features (e.g., ratios of perimeter/area). Split the processed data, ensuring stratification based on the 'diagnosis' column, into training, validation, and test sets (e.g., 70-15-15) for robust model development."
    },
    {
      "step": 5,
      "task_name": "Model Selection and Training",
      "description": "Select and train multiple classification models suited for the medical domain, specifically focusing on Support Vector Machines (SVM) and Logistic Regression, using the training data."
    },
    {
      "step": 6,
      "task_name": "Hyperparameter Tuning and Evaluation",
      "description": "Perform hyperparameter tuning (e.g., GridSearchCV) for the selected models. Evaluate the final optimized models on the unseen test set using crucial classification metrics: **Precision, Recall, F1-score, and ROC-AUC** for the 'diagnosis' prediction."
    }
    {
      "step": 7,
      "task_name": "Model Serialization and API Endpoint Creation",
      "description": "Serialize the final optimized model. Create a lightweight REST API endpoint to serve real-time predictions."
     },
     {
      "step": 8,
      "task_name": "Deployment and Monitoring",
      "description": "Deploy the API and the model to a production environment. Implement basic monitoring."
     }
  ]
}

RULES:
- **Output JSON only.** ABSOLUTELY NO conversation, explanation, or markdown outside of the JSON block.
- Tasks must be high-level but specific to the dataset files mentioned in the CONTEXT.
- **Goal Scoping Rule:** If the user's GOAL is only about **finding the data, making the plan, or training/evaluating the model**, the 'plan_tasks' array **must end at Step 6 (Evaluation)**. Only include Steps 7 and 8 if the goal specifically requests deployment or MLOps tasks.
- **Specificity Rule:** The **'description' field MUST reference specific column names, expected values, and domain-specific steps** (e.g., 'Standardize/Normalize numerical features') based on the dataset CONTEXT.
"""

Ingestion_instruction = """
You are DataIngestionAgent. Your task is to act as a structured interface for Kaggle datasets by providing a single, complete JSON output.

***STRICT EXECUTION PROTOCOL (Follow Sequentially):***
1.  **IF** the user's request contains the word **'download'** (or a known misspelling like 'dowload'):
    a. **SKIP ALL OTHER STEPS in Protocol 2**.
    b. The agent MUST first call **'search_datasets'** to find the dataset's slugs.
    c. From the search results, extract the 'owner_slug', 'dataset_slug', and determine the single largest or most appropriate file name (e.g., 'Netflix_User_Ratings.csv' if not specified by the user).
    d. Proceed directly to call **'download_dataset(owner_slug, dataset_slug, file_name)'**.
    e. Return the final JSON with an acknowledgment/URL in the 'errors' array.

2.  **OTHERWISE (Pure Metadata Mode):**
    a. **SEARCH:** **ALWAYS** start by calling 'search_datasets' using the user query.
    b. **PROCESS:** From the search results, you MUST identify the **SINGLE MOST RELEVANT dataset** and extract its 'owner_slug' and 'dataset_slug'.
    c. **CHAINING:** You MUST sequentially call **'get_dataset_info'** and then **'list_dataset_files'** using the extracted slugs.
    d. **OUTPUT TRANSFORMATION (CRITICAL):** After all tool calls are complete, you MUST transform the raw data into the JSON OUTPUT SCHEMA.
Your job:
- Either find and gather metadata for the single most relevant dataset **OR** execute a direct download request.

ALLOWED MCP METHODS (Use these EXACT names):
1. search_datasets(search: str, sort_by: str = 'DATASET_SORT_BY_RELEVANCE') -> ApiListDatasetsResponse
2. get_dataset_info(owner_slug: str, dataset_slug: str) -> ApiDataset
3. list_dataset_files(owner_slug: str, dataset_slug: str) -> ApiListDatasetFilesResponse
4. download_dataset(owner_slug: str, dataset_slug: str, file_name: str) -> HttpRedirect

JSON OUTPUT SCHEMA (Only one dataset object in the array for this process):
{
  "datasets": [
    {
      "title": "<string>",
      "dataset_ref": "<owner_slug/dataset_slug>", 
      "description": "<string or null>",
      "license_name": "<string or null>",
      "total_bytes": <int or null>,
      "tags": ["tag1","tag2",...],
      "files": [
        {
          "filename": "<string>",
          "size_bytes": <int or null>,
          "file_ref": "<string>"
        }
      ]
    }
  ],
  "errors": []
}

RULES:
- **Output JSON only. ABSOLUTELY NO** markdown, no explanations, no commentary, and NO conversational replies.
- **For Download Requests:** Set the 'datasets' array to empty (`[]`). Add a success/failure message and any resulting URL/error from the `download_dataset` tool to the 'errors' array.
- **For Metadata Requests:** The 'datasets' array must contain the single, fully populated dataset object.
- If a tool call fails, continue the process using the available data and place the specific error message in the "errors" array.
"""

In [11]:
Ingestion_instruction_with_save = """
You are DataIngestionAgent. Your job is to return a Kaggle dataset download URL, and only save the file if the user clearly asks to download it.

--- BEHAVIOR RULES ---

1) If the user's message contains the word "download":
   - Use `search_datasets` to find the dataset.
   - Use `list_dataset_files` to pick the most relevant file (prefer CSV).
   - Use `download_dataset` to get the file's download URL.
   - print the download URL 
   - Then call the local tool `save_to_raw(download_url)` to store the data in data/raw.
   - Return ONLY this JSON:
     {
       "status": "saved",
     }

2) If the user does NOT ask to download:
   - Only provide dataset metadata.
   - Use:
       search_datasets
       get_dataset_info
       list_dataset_files

    -  **SEARCH & CHAINING:** Call **'search_datasets'**, then **'get_dataset_info'**, and then **'list_dataset_files'** for the single most relevant dataset.
    -  **OUTPUT TRANSFORMATION:** Transform the raw data into the JSON OUTPUT SCHEMA (original schema with 'datasets' array).
   - Return ONLY JSON:
     {
       "datasets": [...],
       "errors": []
     }

--- RULES ---
- Output JSON ONLY.
- No explanations, no extra text.
- Never save files unless the user explicitly asks to download.
- When saving, always call `save_to_raw(download_url)` (no other tool).

--- ALLOWED TOOLS ---
- search_datasets
- get_dataset_info
- list_dataset_files
- download_dataset
- save_to_raw
"""

## 1. Ingestion Agent

In [12]:
import os
import requests

def save_to_raw(download_url: str, original_filename: str) -> str:
    """
    Just download the file and save it to data/raw without extracting.
    """

    raw_dir = "../data/raw"
    os.makedirs(raw_dir, exist_ok=True)

    # Download the file bytes
    resp = requests.get(download_url)
    resp.raise_for_status()

    # Save in raw folder
    out_path = os.path.join(raw_dir, original_filename)
    with open(out_path, "wb") as f:
        f.write(resp.content)

    return f"File saved to {out_path}"

In [13]:
os.environ['KAGGLE_USERNAME'] = "ayushvishwakarma14"
os.environ['KAGGLE_KEY'] = "8c48a80c1bea8e54c6f00a1ab1963608"

# Data Ingestion Mcp Agent
mcp_kaggle_server = McpToolset(
    connection_params=StdioConnectionParams(
        server_params=StdioServerParameters(
            command='npx',
            args=[
                '-y',
                'mcp-remote',
                'https://www.kaggle.com/mcp'
            ]
        ),
        timeout=60,
    ),
    tool_filter=[
        "search_datasets", 
        "get_dataset_info", 
        "list_dataset_files",
        "download_dataset"
    ]
)

# Data Ingestion Agent: Its job is to find the dataset and its info from kaggle mcp
ingest_agent = Agent(
    name="DataIngestion_agent",
    model=Gemini(
        model="gemini-2.5-flash",
        retry_options=retry_config
    ),
    instruction=Ingestion_instruction_with_save,
    tools=[mcp_kaggle_server,save_to_raw],
    output_key="Dataset_files", 
)


# async def run_ingestion():
#     """Defines the async context for running the agent."""
#     runner = InMemoryRunner(agent = ingest_agent)
#     response = await runner.run_debug("Find a small Kaggle dataset about Netflix movie ratings and download it for the pipeline.")
#     print(response)

# await run_ingestion()

# print("✅ Ingest_agent created.")

## 2. Planner Agent

In [14]:
# Planner Agent: Its job is to use the google_search tool and present findings.
Planner_agent = Agent(
    name="Planner_agent",
    model=Gemini(
        model="gemini-2.5-flash",
        retry_options=retry_config
    ),
    instruction=Planner_instruction,
    tools=[google_search],
    output_key="Planner_findings",  # The result of this agent will be stored in the session state with this key.
)

print("✅ Planner_agent created.")

# runner = InMemoryRunner(agent = Planner_agent)
# response = await runner.run_debug(Query)

✅ Planner_agent created.


# DataPilot Orchestrator

In [15]:
# # traditional Orchestrator Coordinator: Orchestrates the workflow by calling the sub-agents as tools.
# Orchestrator_agent = Agent(
#     name="OrchestratorCoordinator",
#     model=Gemini(
#         model="gemini-2.5-flash",
#         retry_options=retry_config
#     ),
#     instruction=Orchestrator_instruction,
#     tools=[AgentTool(ingest_agent), AgentTool(Planner_agent)],
# )

# print("✅ root_agent created.")

# Sequential Orchestrator
Orchestrator_agent = SequentialAgent(
    name="DataSciencePipeline",
    sub_agents=[ingest_agent, Planner_agent],
)

print("✅ Sequential Agent created.")

✅ Sequential Agent created.


In [16]:
runner = InMemoryRunner(agent=Orchestrator_agent)
response = await runner.run_debug("""
Find the small Breast Cancer Prediction dataset , download it and make a plan to create a classification model.""")


 ### Created new session: debug_session_id

User > 
Find the small Breast Cancer Prediction dataset , download it and make a plan to create a classification model.


  super().__init__(


DataIngestion_agent > {"status": "saved"}
Planner_agent > ```json
{
  "project_title": "Breast Cancer Classification Model Development",
  "goal": "Create a classification model for Breast Cancer Prediction.",
  "dataset_reference": "vijayaadithyanvg/breast-cancer-prediction",
  "required_file": "data.csv",
  "plan_tasks": [
    {
      "step": 1,
      "task_name": "Data Loading and Initial Inspection",
      "description": "Load the 'data.csv' file into a pandas DataFrame. Perform an initial inspection to understand the dataset's structure, including column names, data types, and the number of non-null values. Generate descriptive statistics to summarize the central tendency, dispersion, and shape of the dataset’s distribution, particularly for numerical features. Identify the target variable, which is expected to be 'diagnosis', and examine its initial state."
    },
    {
      "step": 2,
      "task_name": "Exploratory Data Analysis (EDA)",
      "description": "Analyze the distri