In [8]:
import os
import re
import string
import platform
import uuid
from google.genai import types

from google.adk.agents import LlmAgent
from google.adk.models.google_llm import Gemini
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
from google.adk.tools.tool_context import ToolContext
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from mcp import StdioServerParameters

from google.adk.apps.app import App, ResumabilityConfig
from google.adk.tools.function_tool import FunctionTool


from dotenv import load_dotenv

load_dotenv()  # loads variables from .env
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

if GOOGLE_API_KEY:
    print("‚úÖ Gemini API key setup complete.")
else:
    print("‚ùå GOOGLE_API_KEY not found.")
    
session_service = InMemorySessionService()
def get_available_roots():
    system = platform.system().lower()

    # Windows: detect drives (C:, D:, E:, ...)
    if system == "windows":
        drives = []
        for letter in string.ascii_uppercase:
            drive = f"{letter}:/"
            if os.path.exists(drive):
                drives.append(drive)
        print(f"Detected drives: {drives}")
        return drives

    # Linux/macOS: treat / as the root
    else:
        roots = ['/']
        # Optional: scan /mnt, /media, /Volumes
        for extra in ['/mnt', '/media', '/Volumes']:
            if os.path.exists(extra):
                roots.append(extra)
        return roots



def find_file_by_guess(keyword, roots):
    keyword = keyword.lower()

    for root in roots:
        for dirpath, _, filenames in os.walk(root):
            for f in filenames:
                if keyword in f.lower():
                    return os.path.join(dirpath, f)
    return None



def extract_path_or_keyword(user_message):
    # Full path Windows (C:\folder\file.py)
    windows = re.findall(r"[A-Za-z]:\\[^\s]+", user_message)

    # Linux paths (/root/folder/file)
    linux = re.findall(r"(\/[\w\/\.\-]+|\.[\w\/\.\-]+)", user_message)

    if windows:
        return windows[0], True
    if linux:
        return linux[0], True
    
    # Otherwise treat as keyword for guessing
    return user_message.strip().lower(), False


def get_root_for_path(path):
    # Windows root like "D:/"
    if ":" in path:
        return path.split("\\")[0] + "/"
    # Linux root "/"
    if path.startswith("/"):
        return "/"
    # fallback
    return "./"


def create_filesystem_agent(root_path):
    return LlmAgent(
        model="gemini-2.0-flash-lite",
        name="fs_agent",
        instruction="Find and open files naturally.",
        tools=[
            McpToolset(
                connection_params=StdioConnectionParams(
                    server_params=StdioServerParameters(
                        command="npx",
                        args=[
                         "-y",
                            "@modelcontextprotocol/server-filesystem",
                            root_path
                        ],
                    ),
                    timeout=30,
                )
            )      
        ]
    )

root_agent = create_filesystem_agent("./")
print("Filesystem agent created.", root_agent)
# get_available_roots()
# runner = Runner(agent=create_filesystem_agent("/"), session_service=session_service)
get_available_roots()

‚úÖ Gemini API key setup complete.
Filesystem agent created. name='fs_agent' description='' parent_agent=None sub_agents=[] before_agent_callback=None after_agent_callback=None model='gemini-2.0-flash-lite' instruction='Find and open files naturally.' global_instruction='' static_instruction=None tools=[<google.adk.tools.mcp_tool.mcp_toolset.McpToolset object at 0x0000019D7AAC6890>] generate_content_config=None disallow_transfer_to_parent=False disallow_transfer_to_peers=False include_contents='default' input_schema=None output_schema=None output_key=None planner=None code_executor=None before_model_callback=None after_model_callback=None before_tool_callback=None after_tool_callback=None on_tool_error_callback=None
Detected drives: ['C:/', 'D:/', 'E:/', 'G:/', 'H:/', 'I:/', 'J:/', 'K:/', 'L:/']


['C:/', 'D:/', 'E:/', 'G:/', 'H:/', 'I:/', 'J:/', 'K:/', 'L:/']

In [5]:
import os
import re
import string
import platform
from dotenv import load_dotenv

# ------------------------
# ENVIRONMENT
# ------------------------
load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

print("‚úÖ Gemini API key loaded" if GOOGLE_API_KEY else "‚ùå Missing API key")

# ------------------------
# ADK IMPORTS (Correct versions)
# ------------------------
from google.adk.agents import LlmAgent
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import (
    StdioConnectionParams,
    StdioServerParameters,   # <-- correct ADK import
)

# ------------------------
# ROOT DIRECTORY (SAFE)
# ------------------------
# Absolute required! Prevents MCP crash.
global BASE_DIR
try:
    base_path = os.path.abspath(os.path.dirname(__file__))
    BASE_DIR = base_path
except NameError:
    # Fallback in interactive environments like notebooks
    base_path = os.path.abspath((os.getcwd()))
    BASE_DIR = base_path

# os.makedirs(base_path, exist_ok=True)

# BASE_DIR = os.path.abspath(
#     os.path.join(os.path.dirname(__file__), "workspace")
# )
# os.makedirs(BASE_DIR, exist_ok=True)

print(f"üìÅ MCP Root Directory: {BASE_DIR} (Exists: {os.path.exists(BASE_DIR)})")

# ------------------------
# Dynamic Drive Detection
# ------------------------
def get_available_roots():
    system = platform.system().lower()

    if system == "windows":
        drives = []
        for letter in string.ascii_uppercase:
            root = f"{letter}:/"
            if os.path.exists(root):
                drives.append(root)
        return drives

    # Linux / macOS
    roots = ['/']
    for extra in ['/mnt', '/media', '/Volumes']:
        if os.path.exists(extra):
            roots.append(extra)

    return roots

# ------------------------
# Search File by Keyword
# ------------------------
def find_file_by_guess(keyword, roots):
    keyword = keyword.lower()

    for root in roots:
        for dirpath, _, filenames in os.walk(root):
            for file in filenames:
                if keyword in file.lower():
                    return os.path.join(dirpath, file)

    return None

# ------------------------
# Extract Path or Keyword
# ------------------------
def extract_path_or_keyword(user_message):
    windows = re.findall(r"[A-Za-z]:\\[^\s]+", user_message)
    linux = re.findall(r"(\/[\w\/\.\-]+|\.[\w\/\.\-]+)", user_message)

    if windows:
        return windows[0], True
    if linux:
        return linux[0], True

    return user_message.strip().lower(), False

# ------------------------
# Determine Root for Path
# ------------------------
def get_root_for_path(path):
    if ":" in path:       # Windows drive
        return path.split("\\")[0] + "/"

    if path.startswith("/"):   # Linux root
        return "/"

    return BASE_DIR

# ------------------------
# Create MCP Filesystem Agent
# ------------------------
def create_filesystem_agent(root_path):
    # Convert to absolute path to avoid MCP crash
    root_path = os.path.abspath(root_path)
    print(f"Creating filesystem agent with root: {root_path}")
    return LlmAgent(
        model="gemini-2.0-flash",
        name="filesystem_agent",
        instruction="""
You are a robust filesystem assistant.
The user will speak naturally (e.g., "open the ML file").
You will:

1. Detect whether they gave a full path or just a keyword.
2. If keyword ‚Üí search all drives for closest match.
3. Mount the MCP filesystem server at correct root.
4. Use list_directory and read_file to show file contents.
""",
        tools=[
            McpToolset(
                connection_params=StdioConnectionParams(
                    server_params=StdioServerParameters(
                        command="npx",
                        args=[
                            "-y",
                            "@modelcontextprotocol/server-filesystem",
                            root_path,
                        ],
                    ),
                    timeout=45,
                )
            )
        ],
    )

# ------------------------
# ADK REQUIRED VARIABLE
# ------------------------
# This MUST exist. ADK reads ONLY this variable.
root_agent = create_filesystem_agent(BASE_DIR)
print(BASE_DIR)
print("Filesystem agent created.", root_agent)

‚úÖ Gemini API key loaded
üìÅ MCP Root Directory: g:\ML\File_system_agent (Exists: True)
Creating filesystem agent with root: g:\ML\File_system_agent
g:\ML\File_system_agent
Filesystem agent created. name='filesystem_agent' description='' parent_agent=None sub_agents=[] before_agent_callback=None after_agent_callback=None model='gemini-2.0-flash' instruction='\nYou are a robust filesystem assistant.\nThe user will speak naturally (e.g., "open the ML file").\nYou will:\n\n1. Detect whether they gave a full path or just a keyword.\n2. If keyword ‚Üí search all drives for closest match.\n3. Mount the MCP filesystem server at correct root.\n4. Use list_directory and read_file to show file contents.\n' global_instruction='' static_instruction=None tools=[<google.adk.tools.mcp_tool.mcp_toolset.McpToolset object at 0x0000019D68E4E9D0>] generate_content_config=None disallow_transfer_to_parent=False disallow_transfer_to_peers=False include_contents='default' input_schema=None output_schema=Non

In [7]:
import subprocess

cmd = [
    r"C:\nvm4w\nodejs\npx.cmd",
    "-y",
    "@modelcontextprotocol/server-filesystem",
    r"G:\ML\File_system_agent"
]

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

try:
    stdout, stderr = proc.communicate(timeout=5)
    print("Error:", stderr)
except subprocess.TimeoutExpired:
    print("MCP server is RUNNING! Success!")
    proc.kill()

MCP server is RUNNING! Success!
