In [None]:
# --- Logging base (stderr, no stdout) ---
import sys, logging, time
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(name)s :: %(message)s",
    stream=sys.stderr,  # MUY IMPORTANTE en STDIO
)
log = logging.getLogger("taskpilot")


In [None]:
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("TaskPilot")

# Decorador simple para cronometrar tools
def timed(fn):
    async def _wrap(*args, **kwargs):
        ctx: Context = kwargs.get("ctx") or args[-1]
        t0 = time.perf_counter()
        try:
            await ctx.debug(f"Entering {fn.__name__}")
            return await fn(*args, **kwargs)
        except Exception as ex:
            await ctx.error(f"{fn.__name__} failed: {type(ex).__name__}: {ex}")
            raise
        finally:
            dt = (time.perf_counter() - t0) * 1000
            await ctx.info(f"{fn.__name__} completed in {dt:.1f} ms")
    return _wrap

@mcp.tool()
@timed
def add_task(title: str, tags: Optional[list[str]] = None, ctx: Context = None) -> Task:
    if ctx:  # logs hacia el cliente
        # Nivel “notice” usando ctx.log (válido por especificación)
        # y un log estructurado con logger "tasks"
        _ = ctx.log("notice", f'Creating task "{title}"', logger_name="tasks")
    log.info("add_task called | title=%r tags=%r", title, tags)  # a STDERR

    title = (title or "").strip()
    if not title:
        raise ValueError("Title cannot be empty.")

    task = Task(title=title, tags=[t for t in (tags or []) if t.strip()])
    STORE[task.id] = task.model_dump()
    save(STORE)

    if ctx:
        # Ejemplo de log estructurado JSON al cliente
        await ctx.session.send_log_message(
            level="info",
            data={"event": "task_created", "task_id": task.id, "title": task.title},
            logger="tasks"
        )
    return task

@mcp.tool()
@timed
async def bulk_import(lines: list[str], ctx: Context) -> int:
    total = len(lines) or 1
    created = 0
    await ctx.info(f"Starting import of {len(lines)} tasks")
    for idx, raw in enumerate(lines, start=1):
        title = (raw or "").strip()
        if not title:
            await ctx.warning(f"Skipping empty line {idx}")
            continue
        t = Task(title=title)
        STORE[t.id] = t.model_dump()
        created += 1
        await ctx.report_progress(progress=idx, total=total, message=f"Imported {idx}/{total}")
        await ctx.debug(f"Created task {t.id}: {t.title}")
    save(STORE)
    await ctx.info(f"Import complete: {created} created")
    return created


In [None]:
# en tu MCPClient
async def set_log_level(self, level: str = "info"):
    # Camino A (SDK reciente):
    if hasattr(self.session, "set_logging_level"):
        await self.session.set_logging_level(level)
        return
    # Camino B (fallback JSON-RPC):
    await self.session.request("logging/setLevel", {"level": level})


* No loguear en stdout

* Logs al host: stderr

* Logs al cliente MCP: notifications/message

* Cliente puede fijar nivel: logging/setLevel