From c3aa650e08edd23240eced6d7cde3817f62a5bd1 Mon Sep 17 00:00:00 2001 From: Rafael Poyiadzi Date: Tue, 24 Feb 2026 15:01:02 +0000 Subject: [PATCH 1/8] Unified input API: artifact_id + data replaces input_csv Processing tools now accept artifact_id (UUID from upload_data) or data (list[dict]) instead of input_csv/input_data/input_json. Adds upload_data tool for URL/file ingestion and request_upload_url for presigned large-file uploads in HTTP mode. Phase 1: Simplified _SingleSourceInput and MergeInput models Phase 2: upload_data tool (URL + local path + Google Sheets) Phase 3: Presigned URL upload system (HMAC, Redis metadata, REST endpoint) Co-Authored-By: Claude Opus 4.6 --- everyrow-mcp/manifest.json | 4 + everyrow-mcp/src/everyrow_mcp/config.py | 23 +- everyrow-mcp/src/everyrow_mcp/http_config.py | 2 + everyrow-mcp/src/everyrow_mcp/models.py | 106 ++++--- everyrow-mcp/src/everyrow_mcp/redis_store.py | 14 + everyrow-mcp/src/everyrow_mcp/server.py | 3 + everyrow-mcp/src/everyrow_mcp/tools.py | 159 +++++++--- everyrow-mcp/src/everyrow_mcp/uploads.py | 225 +++++++++++++++ everyrow-mcp/src/everyrow_mcp/utils.py | 91 ++++++ everyrow-mcp/tests/test_mcp_e2e.py | 46 +-- everyrow-mcp/tests/test_server.py | 288 ++++++++++++++----- everyrow-mcp/tests/test_stdio_content.py | 45 +-- everyrow-mcp/tests/test_uploads.py | 129 +++++++++ everyrow-mcp/tests/test_utils.py | 68 ++++- 14 files changed, 1009 insertions(+), 194 deletions(-) create mode 100644 everyrow-mcp/src/everyrow_mcp/uploads.py create mode 100644 everyrow-mcp/tests/test_uploads.py diff --git a/everyrow-mcp/manifest.json b/everyrow-mcp/manifest.json index 5db2513d..a601a8af 100644 --- a/everyrow-mcp/manifest.json +++ b/everyrow-mcp/manifest.json @@ -72,6 +72,10 @@ { "name": "everyrow_cancel", "description": "Cancel a running everyrow task. Use when the user wants to stop a task that is currently processing." + }, + { + "name": "everyrow_upload_data", + "description": "Upload data from a URL or local file. Returns an artifact_id for use in processing tools." } ], "user_config": { diff --git a/everyrow-mcp/src/everyrow_mcp/config.py b/everyrow-mcp/src/everyrow_mcp/config.py index ff1e85b9..21b8be50 100644 --- a/everyrow-mcp/src/everyrow_mcp/config.py +++ b/everyrow-mcp/src/everyrow_mcp/config.py @@ -17,10 +17,6 @@ class Settings(BaseSettings): default=50_000, description="Maximum number of rows allowed in inline JSON data", ) - max_inline_data_bytes: int = Field( - default=10 * 1024 * 1024, - description="Maximum size in bytes for inline CSV string data (10 MB)", - ) max_schema_properties: int = Field( default=50, description="Maximum number of properties allowed in a response schema", @@ -79,6 +75,25 @@ class Settings(BaseSettings): default=604_800, description="Refresh token TTL in seconds (7 days)", ) + max_inline_rows: int = Field( + default=5000, + description="Maximum rows allowed in inline data (list[dict]).", + ) + + # Upload settings (HTTP mode only) + upload_secret: str = Field( + default="", + description="HMAC-SHA256 secret for signing upload URLs. Auto-generated if empty.", + ) + upload_url_ttl: int = Field( + default=300, + description="Presigned upload URL validity in seconds (5 min).", + ) + max_upload_size_bytes: int = Field( + default=50 * 1024 * 1024, + description="Maximum upload file size in bytes (50 MB).", + ) + everyrow_api_key: str | None = None @property diff --git a/everyrow-mcp/src/everyrow_mcp/http_config.py b/everyrow-mcp/src/everyrow_mcp/http_config.py index a72cf339..b9d0acc4 100644 --- a/everyrow-mcp/src/everyrow_mcp/http_config.py +++ b/everyrow-mcp/src/everyrow_mcp/http_config.py @@ -22,6 +22,7 @@ from everyrow_mcp.redis_store import get_redis_client from everyrow_mcp.routes import api_download, api_progress from everyrow_mcp.templates import RESULTS_HTML, SESSION_HTML +from everyrow_mcp.uploads import handle_upload logger = logging.getLogger(__name__) @@ -101,6 +102,7 @@ def _register_routes( mcp.custom_route("/api/results/{task_id}/download", ["GET", "OPTIONS"])( api_download ) + mcp.custom_route("/api/uploads/{upload_id}", ["PUT"])(handle_upload) async def _health(_request: Request) -> Response: try: diff --git a/everyrow-mcp/src/everyrow_mcp/models.py b/everyrow-mcp/src/everyrow_mcp/models.py index 842cddaf..a2cebfef 100644 --- a/everyrow-mcp/src/everyrow_mcp/models.py +++ b/everyrow-mcp/src/everyrow_mcp/models.py @@ -16,7 +16,7 @@ ) from everyrow_mcp.config import settings -from everyrow_mcp.utils import validate_csv_path +from everyrow_mcp.utils import _is_url, validate_csv_path, validate_url JSON_TYPE_MAP = { "string": str, @@ -134,34 +134,31 @@ def _check_exactly_one( class _SingleSourceInput(BaseModel): model_config = ConfigDict(str_strip_whitespace=True, extra="forbid") - input_csv: str | None = Field( + artifact_id: str | None = Field( default=None, - description="Absolute path to CSV file (local/stdio mode only).", + description="Artifact ID (UUID) from upload_data or request_upload_url.", ) - data: str | list[dict[str, Any]] | None = Field( + data: list[dict[str, Any]] | None = Field( default=None, - description="Inline data — CSV string or JSON array of objects.", + description="Inline data as a list of row objects.", ) - @field_validator("input_csv") + @field_validator("artifact_id") @classmethod - def validate_input_csv(cls, v: str | None) -> str | None: + def validate_artifact_id(cls, v: str | None) -> str | None: if v is not None: - validate_csv_path(v) + try: + UUID(v) + except ValueError as exc: + raise ValueError(f"artifact_id must be a valid UUID: {v}") from exc return v @field_validator("data") @classmethod def validate_data_size( - cls, v: str | list[dict[str, Any]] | None - ) -> str | list[dict[str, Any]] | None: - if v is None: - return v - if isinstance(v, str) and len(v) > settings.max_inline_data_bytes: - raise ValueError( - f"Inline data exceeds {settings.max_inline_data_bytes // (1024 * 1024)} MB limit" - ) - if isinstance(v, list) and len(v) > settings.max_inline_rows: + cls, v: list[dict[str, Any]] | None + ) -> list[dict[str, Any]] | None: + if v is not None and len(v) > settings.max_inline_rows: raise ValueError( f"Inline data has {len(v)} rows (max {settings.max_inline_rows})" ) @@ -170,8 +167,8 @@ def validate_data_size( @model_validator(mode="after") def check_input_source(self): _check_exactly_one( - values=(self.input_csv, self.data), - field_names=("input_csv", "data"), + values=(self.artifact_id, self.data), + field_names=("artifact_id", "data"), label="Input", ) return self @@ -276,23 +273,23 @@ class MergeInput(BaseModel): ) # LEFT table - left_csv: str | None = Field( + left_artifact_id: str | None = Field( default=None, - description="Absolute path to the left CSV (local/stdio mode only).", + description="Artifact ID (UUID) for the left table, from upload_data or request_upload_url.", ) - left_data: str | list[dict[str, Any]] | None = Field( + left_data: list[dict[str, Any]] | None = Field( default=None, - description="Inline data for the left table — CSV string or JSON array of objects.", + description="Inline data for the left table as a list of row objects.", ) # RIGHT table - right_csv: str | None = Field( + right_artifact_id: str | None = Field( default=None, - description="Absolute path to the right CSV (local/stdio mode only).", + description="Artifact ID (UUID) for the right table, from upload_data or request_upload_url.", ) - right_data: str | list[dict[str, Any]] | None = Field( + right_data: list[dict[str, Any]] | None = Field( default=None, - description="Inline data for the right table — CSV string or JSON array of objects.", + description="Inline data for the right table as a list of row objects.", ) merge_on_left: str | None = Field( @@ -313,23 +310,37 @@ class MergeInput(BaseModel): description="Relationship type: many_to_one (default) or one_to_one.", ) - @field_validator("left_csv", "right_csv") + @field_validator("left_artifact_id", "right_artifact_id") @classmethod - def validate_csv_paths(cls, v: str | None) -> str | None: + def validate_artifact_ids(cls, v: str | None) -> str | None: if v is not None: - validate_csv_path(v) + try: + UUID(v) + except ValueError as exc: + raise ValueError(f"artifact_id must be a valid UUID: {v}") from exc + return v + + @field_validator("left_data", "right_data") + @classmethod + def validate_data_size( + cls, v: list[dict[str, Any]] | None + ) -> list[dict[str, Any]] | None: + if v is not None and len(v) > settings.max_inline_rows: + raise ValueError( + f"Inline data has {len(v)} rows (max {settings.max_inline_rows})" + ) return v @model_validator(mode="after") def check_sources(self) -> "MergeInput": _check_exactly_one( - values=(self.left_csv, self.left_data), - field_names=("left_csv", "left_data"), + values=(self.left_artifact_id, self.left_data), + field_names=("left_artifact_id", "left_data"), label="Left table", ) _check_exactly_one( - values=(self.right_csv, self.right_data), - field_names=("right_csv", "right_data"), + values=(self.right_artifact_id, self.right_data), + field_names=("right_artifact_id", "right_data"), label="Right table", ) return self @@ -348,6 +359,33 @@ class ForecastInput(_SingleSourceInput): ) +class UploadDataInput(BaseModel): + """Input for the upload_data tool.""" + + model_config = ConfigDict(str_strip_whitespace=True, extra="forbid") + + source: str = Field( + ..., + description="Data source: http(s) URL (Google Sheets/Drive supported) " + "or absolute local file path (stdio mode only).", + min_length=1, + ) + + @field_validator("source") + @classmethod + def validate_source(cls, v: str) -> str: + if _is_url(v): + return validate_url(v) + # Local path + if settings.is_http: + raise ValueError( + "Local file paths are not supported in HTTP mode. " + "Use a URL or request_upload_url instead." + ) + validate_csv_path(v) + return v + + class SingleAgentInput(BaseModel): """Input for a single agent operation (no CSV).""" diff --git a/everyrow-mcp/src/everyrow_mcp/redis_store.py b/everyrow-mcp/src/everyrow_mcp/redis_store.py index 2d365ddb..32384be4 100644 --- a/everyrow-mcp/src/everyrow_mcp/redis_store.py +++ b/everyrow-mcp/src/everyrow_mcp/redis_store.py @@ -194,3 +194,17 @@ async def store_poll_token(task_id: str, poll_token: str) -> None: async def get_poll_token(task_id: str) -> str | None: return await get_redis_client().get(build_key("poll_token", task_id)) + + +# ── Upload metadata ─────────────────────────────────────────── + + +async def store_upload_meta(upload_id: str, meta_json: str, ttl: int) -> None: + """Store upload metadata with TTL (consume-on-use).""" + await get_redis_client().setex(build_key("upload", upload_id), ttl, meta_json) + + +async def pop_upload_meta(upload_id: str) -> str | None: + """Atomically get and delete upload metadata (prevents replay).""" + key = build_key("upload", upload_id) + return await get_redis_client().getdel(key) diff --git a/everyrow-mcp/src/everyrow_mcp/server.py b/everyrow-mcp/src/everyrow_mcp/server.py index bb4b2726..cc28c384 100644 --- a/everyrow-mcp/src/everyrow_mcp/server.py +++ b/everyrow-mcp/src/everyrow_mcp/server.py @@ -18,6 +18,7 @@ _RESULTS_META, everyrow_results_http, ) +from everyrow_mcp.uploads import register_upload_tool class InputArgs(BaseModel): @@ -87,6 +88,8 @@ def main(): )(everyrow_results_http) if input_args.http: + register_upload_tool(mcp) + if input_args.no_auth: mcp_server_url = f"http://localhost:{input_args.port}" else: diff --git a/everyrow-mcp/src/everyrow_mcp/tools.py b/everyrow-mcp/src/everyrow_mcp/tools.py index 01b19407..2416cda2 100644 --- a/everyrow-mcp/src/everyrow_mcp/tools.py +++ b/everyrow-mcp/src/everyrow_mcp/tools.py @@ -8,12 +8,14 @@ from typing import Any from uuid import UUID +import pandas as pd from everyrow.api_utils import handle_response from everyrow.constants import EveryrowError from everyrow.generated.api.tasks import get_task_status_tasks_task_id_status_get from everyrow.generated.models.public_task_type import PublicTaskType from everyrow.ops import ( agent_map_async, + create_table_artifact, dedupe_async, forecast_async, merge_async, @@ -40,6 +42,7 @@ ScreenInput, SingleAgentInput, StdioResultsInput, + UploadDataInput, _schema_to_model, ) from everyrow_mcp.result_store import ( @@ -56,7 +59,15 @@ create_tool_response, write_initial_task_state, ) -from everyrow_mcp.utils import load_data, save_result_to_csv +from everyrow_mcp.utils import _is_url, fetch_csv_from_url, save_result_to_csv + + +def _resolve_input(params) -> UUID | pd.DataFrame: + """Resolve artifact_id or data to a UUID or DataFrame for SDK ops.""" + if params.artifact_id: + return UUID(params.artifact_id) + return pd.DataFrame(params.data) + logger = logging.getLogger(__name__) @@ -92,7 +103,7 @@ async def everyrow_agent(params: AgentInput, ctx: EveryRowContext) -> list[TextC client = _get_client(ctx) _clear_task_state() - df = load_data(data=params.data, input_csv=params.input_csv) + input_data = _resolve_input(params) response_model: type[BaseModel] | None = None if params.response_schema: @@ -100,24 +111,31 @@ async def everyrow_agent(params: AgentInput, ctx: EveryRowContext) -> list[TextC async with create_session(client=client) as session: session_url = session.get_url() - kwargs: dict[str, Any] = {"task": params.task, "session": session, "input": df} + kwargs: dict[str, Any] = { + "task": params.task, + "session": session, + "input": input_data, + } if response_model: kwargs["response_model"] = response_model cohort_task = await agent_map_async(**kwargs) task_id = str(cohort_task.task_id) + total = len(input_data) if isinstance(input_data, pd.DataFrame) else 0 write_initial_task_state( task_id, task_type=PublicTaskType.AGENT, session_url=session_url, - total=len(df), + total=total, ) return await create_tool_response( task_id=task_id, session_url=session_url, - label=f"Submitted: {len(df)} agents starting.", + label=f"Submitted: {total} agents starting." + if total + else "Submitted: agents starting.", token=client.token, - total=len(df), + total=total, mcp_server_url=ctx.request_context.lifespan_context.mcp_server_url, ) @@ -231,7 +249,7 @@ async def everyrow_rank(params: RankInput, ctx: EveryRowContext) -> list[TextCon client = _get_client(ctx) _clear_task_state() - df = load_data(data=params.data, input_csv=params.input_csv) + input_data = _resolve_input(params) response_model: type[BaseModel] | None = None if params.response_schema: @@ -242,26 +260,29 @@ async def everyrow_rank(params: RankInput, ctx: EveryRowContext) -> list[TextCon cohort_task = await rank_async( task=params.task, session=session, - input=df, + input=input_data, field_name=params.field_name, field_type=params.field_type, response_model=response_model, ascending_order=params.ascending_order, ) task_id = str(cohort_task.task_id) + total = len(input_data) if isinstance(input_data, pd.DataFrame) else 0 write_initial_task_state( task_id, task_type=PublicTaskType.RANK, session_url=session_url, - total=len(df), + total=total, ) return await create_tool_response( task_id=task_id, session_url=session_url, - label=f"Submitted: {len(df)} rows for ranking.", + label=f"Submitted: {total} rows for ranking." + if total + else "Submitted: rows for ranking.", token=client.token, - total=len(df), + total=total, mcp_server_url=ctx.request_context.lifespan_context.mcp_server_url, ) @@ -311,7 +332,7 @@ async def everyrow_screen( client = _get_client(ctx) _clear_task_state() - df = load_data(data=params.data, input_csv=params.input_csv) + input_data = _resolve_input(params) response_model: type[BaseModel] | None = None if params.response_schema: @@ -322,23 +343,26 @@ async def everyrow_screen( cohort_task = await screen_async( task=params.task, session=session, - input=df, + input=input_data, response_model=response_model, ) task_id = str(cohort_task.task_id) + total = len(input_data) if isinstance(input_data, pd.DataFrame) else 0 write_initial_task_state( task_id, task_type=PublicTaskType.SCREEN, session_url=session_url, - total=len(df), + total=total, ) return await create_tool_response( task_id=task_id, session_url=session_url, - label=f"Submitted: {len(df)} rows for screening.", + label=f"Submitted: {total} rows for screening." + if total + else "Submitted: rows for screening.", token=client.token, - total=len(df), + total=total, mcp_server_url=ctx.request_context.lifespan_context.mcp_server_url, ) @@ -384,29 +408,32 @@ async def everyrow_dedupe( client = _get_client(ctx) _clear_task_state() - df = load_data(data=params.data, input_csv=params.input_csv) + input_data = _resolve_input(params) async with create_session(client=client) as session: session_url = session.get_url() cohort_task = await dedupe_async( equivalence_relation=params.equivalence_relation, session=session, - input=df, + input=input_data, ) task_id = str(cohort_task.task_id) + total = len(input_data) if isinstance(input_data, pd.DataFrame) else 0 write_initial_task_state( task_id, task_type=PublicTaskType.DEDUPE, session_url=session_url, - total=len(df), + total=total, ) return await create_tool_response( task_id=task_id, session_url=session_url, - label=f"Submitted: {len(df)} rows for deduplication.", + label=f"Submitted: {total} rows for deduplication." + if total + else "Submitted: rows for deduplication.", token=client.token, - total=len(df), + total=total, mcp_server_url=ctx.request_context.lifespan_context.mcp_server_url, ) @@ -461,35 +488,47 @@ async def everyrow_merge(params: MergeInput, ctx: EveryRowContext) -> list[TextC client = _get_client(ctx) _clear_task_state() - left_df = load_data(data=params.left_data, input_csv=params.left_csv) - right_df = load_data(data=params.right_data, input_csv=params.right_csv) + left_input: UUID | pd.DataFrame + if params.left_artifact_id: + left_input = UUID(params.left_artifact_id) + else: + left_input = pd.DataFrame(params.left_data) + + right_input: UUID | pd.DataFrame + if params.right_artifact_id: + right_input = UUID(params.right_artifact_id) + else: + right_input = pd.DataFrame(params.right_data) async with create_session(client=client) as session: session_url = session.get_url() cohort_task = await merge_async( task=params.task, session=session, - left_table=left_df, - right_table=right_df, + left_table=left_input, + right_table=right_input, merge_on_left=params.merge_on_left, merge_on_right=params.merge_on_right, use_web_search=params.use_web_search, relationship_type=params.relationship_type, ) task_id = str(cohort_task.task_id) + total = len(left_input) if isinstance(left_input, pd.DataFrame) else 0 write_initial_task_state( task_id, task_type=PublicTaskType.MERGE, session_url=session_url, - total=len(left_df), + total=total, ) return await create_tool_response( task_id=task_id, session_url=session_url, - label=f"Submitted: {len(left_df)} left rows for merging.", + label=f"Submitted: {total} left rows for merging." + if total + else "Submitted: rows for merging.", token=client.token, - total=len(left_df), + total=total, mcp_server_url=ctx.request_context.lifespan_context.mcp_server_url, ) @@ -533,33 +572,87 @@ async def everyrow_forecast( client = _get_client(ctx) _clear_task_state() - df = load_data(data=params.data, input_csv=params.input_csv) + input_data = _resolve_input(params) async with create_session(client=client) as session: session_url = session.get_url() cohort_task = await forecast_async( task=params.context or "", session=session, - input=df, + input=input_data, ) task_id = str(cohort_task.task_id) + total = len(input_data) if isinstance(input_data, pd.DataFrame) else 0 write_initial_task_state( task_id, task_type=PublicTaskType.FORECAST, session_url=session_url, - total=len(df), + total=total, ) return await create_tool_response( task_id=task_id, session_url=session_url, - label=f"Submitted: {len(df)} rows for forecasting (6 research dimensions + dual forecaster per row).", + label=f"Submitted: {total} rows for forecasting (6 research dimensions + dual forecaster per row)." + if total + else "Submitted: rows for forecasting.", token=client.token, - total=len(df), + total=total, mcp_server_url=ctx.request_context.lifespan_context.mcp_server_url, ) +@mcp.tool( + name="everyrow_upload_data", + structured_output=False, + annotations=ToolAnnotations( + title="Upload Data", + readOnlyHint=False, + destructiveHint=False, + idempotentHint=True, + openWorldHint=True, + ), +) +async def everyrow_upload_data( + params: UploadDataInput, ctx: EveryRowContext +) -> list[TextContent]: + """Upload data from a URL or local file. Returns an artifact_id for use in processing tools. + + Use this tool to ingest data before calling everyrow_agent, everyrow_screen, + everyrow_rank, everyrow_dedupe, everyrow_merge, or everyrow_forecast. + + Supported sources: + - HTTP(S) URLs (including Google Sheets — auto-converted to CSV export) + - Local CSV file paths (stdio mode only) + + Returns an artifact_id (UUID) that can be passed to any processing tool's + artifact_id parameter. The data is stored server-side and can be reused + across multiple tool calls. + """ + client = _get_client(ctx) + + if _is_url(params.source): + df = await fetch_csv_from_url(params.source) + else: + df = pd.read_csv(params.source) + + async with create_session(client=client) as session: + artifact_id = await create_table_artifact(df, session) + + return [ + TextContent( + type="text", + text=json.dumps( + { + "artifact_id": str(artifact_id), + "rows": len(df), + "columns": list(df.columns), + } + ), + ) + ] + + @mcp.tool( name="everyrow_progress", structured_output=False, diff --git a/everyrow-mcp/src/everyrow_mcp/uploads.py b/everyrow-mcp/src/everyrow_mcp/uploads.py new file mode 100644 index 00000000..d5799c8d --- /dev/null +++ b/everyrow-mcp/src/everyrow_mcp/uploads.py @@ -0,0 +1,225 @@ +"""Presigned URL upload system for large files (HTTP mode only). + +Provides: +- ``request_upload_url`` MCP tool — returns a signed upload URL + curl instructions +- ``handle_upload`` REST endpoint — receives the file and creates an artifact +""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import logging +import secrets +import time +from io import BytesIO +from uuid import uuid4 + +import pandas as pd +from everyrow.ops import create_table_artifact +from everyrow.session import create_session +from mcp.server.fastmcp import FastMCP +from mcp.types import TextContent, ToolAnnotations +from pydantic import BaseModel, ConfigDict, Field +from starlette.requests import Request +from starlette.responses import JSONResponse + +from everyrow_mcp import redis_store +from everyrow_mcp.config import settings +from everyrow_mcp.tool_helpers import EveryRowContext + +logger = logging.getLogger(__name__) + + +# ── Input model ─────────────────────────────────────────────── + + +class RequestUploadUrlInput(BaseModel): + """Input for the request_upload_url tool.""" + + model_config = ConfigDict(str_strip_whitespace=True, extra="forbid") + + filename: str = Field( + ..., + description="Name of the file to upload (must end in .csv).", + min_length=1, + ) + + +# ── HMAC signing ────────────────────────────────────────────── + +_secret: list[str] = [] # mutable container to avoid global statement + + +def _get_secret() -> str: + """Return the HMAC secret, generating one if not configured.""" + if not _secret: + _secret.append(settings.upload_secret or secrets.token_urlsafe(32)) + return _secret[0] + + +def sign_upload_url(upload_id: str, expires_at: int) -> str: + """Create an HMAC-SHA256 signature for an upload URL.""" + msg = f"{upload_id}:{expires_at}" + return hmac.new(_get_secret().encode(), msg.encode(), hashlib.sha256).hexdigest() + + +def verify_upload_signature(upload_id: str, expires_at: int, signature: str) -> bool: + """Verify an upload URL signature and check expiry.""" + if time.time() > expires_at: + return False + expected = sign_upload_url(upload_id, expires_at) + return hmac.compare_digest(expected, signature) + + +# ── MCP tool ────────────────────────────────────────────────── + + +def register_upload_tool(mcp: FastMCP) -> None: + """Register the request_upload_url tool (HTTP mode only).""" + + @mcp.tool( + name="everyrow_request_upload_url", + structured_output=False, + annotations=ToolAnnotations( + title="Request Upload URL", + readOnlyHint=True, + destructiveHint=False, + idempotentHint=False, + openWorldHint=False, + ), + ) + async def request_upload_url( + params: RequestUploadUrlInput, ctx: EveryRowContext + ) -> list[TextContent]: + """Request a presigned URL to upload a large file. + + Use this when you have a file in the sandbox that is too large to pass as + inline data. Returns a URL and curl command. After uploading, use the + returned artifact_id in any processing tool. + + Steps: + 1. Call this tool with the filename + 2. Execute the returned curl command + 3. Use the artifact_id from the upload response + """ + if not params.filename.lower().endswith(".csv"): + return [ + TextContent( + type="text", + text="Error: filename must end in .csv", + ) + ] + + upload_id = str(uuid4()) + expires_at = int(time.time()) + settings.upload_url_ttl + sig = sign_upload_url(upload_id, expires_at) + + mcp_server_url = ctx.request_context.lifespan_context.mcp_server_url + upload_url = ( + f"{mcp_server_url}/api/uploads/{upload_id}?expires={expires_at}&sig={sig}" + ) + + # Store metadata in Redis + meta = json.dumps( + { + "upload_id": upload_id, + "filename": params.filename, + "expires_at": expires_at, + } + ) + await redis_store.store_upload_meta(upload_id, meta, settings.upload_url_ttl) + + return [ + TextContent( + type="text", + text=json.dumps( + { + "upload_url": upload_url, + "upload_id": upload_id, + "expires_in": settings.upload_url_ttl, + "max_size_bytes": settings.max_upload_size_bytes, + "curl_command": f'curl -X PUT -T "{params.filename}" "{upload_url}"', + } + ), + ) + ] + + +# ── REST endpoint ───────────────────────────────────────────── + + +async def _validate_upload( + request: Request, +) -> tuple[bytes, None] | tuple[None, JSONResponse]: + """Validate upload signature, metadata, and body. Returns (body, None) or (None, error).""" + upload_id = request.path_params["upload_id"] + expires_str = request.query_params.get("expires", "") + sig = request.query_params.get("sig", "") + try: + expires_at = int(expires_str) + except (ValueError, TypeError): + return None, JSONResponse( + {"error": "Invalid expires parameter"}, status_code=400 + ) + + if not verify_upload_signature(upload_id, expires_at, sig): + return None, JSONResponse( + {"error": "Invalid or expired signature"}, status_code=403 + ) + + meta_json = await redis_store.pop_upload_meta(upload_id) + if meta_json is None: + return None, JSONResponse( + {"error": "Upload URL already used or expired"}, status_code=410 + ) + + body = await request.body() + if not body: + return None, JSONResponse({"error": "Empty body"}, status_code=400) + if len(body) > settings.max_upload_size_bytes: + return None, JSONResponse( + { + "error": f"File too large: {len(body)} bytes (max {settings.max_upload_size_bytes})" + }, + status_code=413, + ) + return body, None + + +async def handle_upload(request: Request) -> JSONResponse: + """PUT /api/uploads/{upload_id} — receive an uploaded file and create an artifact.""" + body, error = await _validate_upload(request) + if error is not None: + return error + + try: + df = pd.read_csv(BytesIO(body)) # type: ignore[arg-type] + except Exception as exc: + return JSONResponse({"error": f"Could not parse CSV: {exc}"}, status_code=400) + + if df.empty: + return JSONResponse({"error": "CSV is empty"}, status_code=400) + + try: + from everyrow.api_utils import create_client # noqa: PLC0415 + + with create_client() as client: + async with create_session(client=client) as session: + artifact_id = await create_table_artifact(df, session) + except Exception as exc: + logger.exception("Failed to create artifact from upload") + return JSONResponse( + {"error": f"Failed to create artifact: {exc}"}, status_code=500 + ) + + return JSONResponse( + { + "artifact_id": str(artifact_id), + "rows": len(df), + "columns": list(df.columns), + "size_bytes": len(body), + }, + status_code=201, + ) diff --git a/everyrow-mcp/src/everyrow_mcp/utils.py b/everyrow-mcp/src/everyrow_mcp/utils.py index ed747dd3..0f70c013 100644 --- a/everyrow-mcp/src/everyrow_mcp/utils.py +++ b/everyrow-mcp/src/everyrow_mcp/utils.py @@ -1,13 +1,104 @@ """Utility functions for the everyrow MCP server.""" import json +import re from io import StringIO from pathlib import Path from typing import Any +from urllib.parse import urlparse +import httpx import pandas as pd +def _is_url(value: str) -> bool: + """Check if a string looks like an HTTP(S) URL.""" + return value.startswith("http://") or value.startswith("https://") + + +def validate_url(url: str) -> str: + """Validate and normalise an HTTP(S) URL. + + Returns the URL unchanged (after basic validation). + + Raises: + ValueError: If the URL scheme is not http/https or the URL has no host. + """ + parsed = urlparse(url) + if parsed.scheme not in ("http", "https"): + raise ValueError(f"URL must use http or https scheme: {url}") + if not parsed.netloc: + raise ValueError(f"URL has no host: {url}") + return url + + +def _normalise_google_sheets_url(url: str) -> str: + """Convert a Google Sheets URL to its CSV export variant. + + Handles: + - ``/edit...`` → ``/export?format=csv`` + - ``/pub?...`` → ``/pub?...&output=csv`` + - Already has ``/export?format=csv`` → unchanged + """ + if "docs.google.com/spreadsheets" not in url: + return url + + # Already an export URL + if "/export" in url and "format=csv" in url: + return url + + # /edit or /edit#gid=... → /export?format=csv + match = re.match(r"(https://docs\.google\.com/spreadsheets/d/[^/]+)", url) + if match: + base = match.group(1) + # Extract gid if present + gid_match = re.search(r"gid=(\d+)", url) + if gid_match: + return f"{base}/export?format=csv&gid={gid_match.group(1)}" + return f"{base}/export?format=csv" + + return url + + +async def fetch_csv_from_url(url: str) -> pd.DataFrame: + """Fetch CSV data from a URL and return a DataFrame. + + Automatically normalises Google Sheets URLs to their CSV export endpoint. + + Raises: + ValueError: If the response cannot be parsed as CSV. + httpx.HTTPStatusError: On non-2xx responses. + """ + url = _normalise_google_sheets_url(url) + + async with httpx.AsyncClient(follow_redirects=True, timeout=60.0) as client: + response = await client.get(url) + response.raise_for_status() + + content_type = response.headers.get("content-type", "") + + # Try CSV first + try: + df = pd.read_csv(StringIO(response.text)) + if not df.empty: + return df + except Exception: + pass + + # Try JSON array + try: + data = json.loads(response.text) + if isinstance(data, list) and data: + return pd.DataFrame(data) + except (json.JSONDecodeError, ValueError): + pass + + raise ValueError( + f"Could not parse response from {url} as CSV or JSON. " + f"Content-Type: {content_type}" + ) + + def validate_csv_path(path: str) -> None: """Validate that a CSV file exists and is readable. diff --git a/everyrow-mcp/tests/test_mcp_e2e.py b/everyrow-mcp/tests/test_mcp_e2e.py index 7a934ef1..06ca01b1 100644 --- a/everyrow-mcp/tests/test_mcp_e2e.py +++ b/everyrow-mcp/tests/test_mcp_e2e.py @@ -18,7 +18,6 @@ from unittest.mock import AsyncMock, MagicMock, patch from uuid import UUID, uuid4 -import pandas as pd import pytest from everyrow.api_utils import create_client from everyrow.generated.models.public_task_type import PublicTaskType @@ -129,7 +128,7 @@ class TestMcpProtocol: @pytest.mark.asyncio async def test_list_tools(self, _http_state): - """list_tools returns all 10 registered tools.""" + """list_tools returns all registered tools (including upload_data).""" async with mcp_client() as session: result = await session.list_tools() tool_names = sorted(t.name for t in result.tools) @@ -146,17 +145,14 @@ async def test_list_tools(self, _http_state): "everyrow_results", "everyrow_screen", "everyrow_single_agent", + "everyrow_upload_data", ] ) assert tool_names == expected @pytest.mark.asyncio - async def test_call_screen_tool(self, _http_state, tmp_path): + async def test_call_screen_tool(self, _http_state): """Submit a screen task via MCP protocol and verify the response.""" - df = pd.DataFrame([{"company": "Acme", "role": "Engineer"}]) - csv_path = tmp_path / "test.csv" - df.to_csv(csv_path, index=False) - task_id = str(uuid4()) mock_task = _mock_task(task_id) _, fake_create_session = _mock_session() @@ -182,7 +178,9 @@ async def test_call_screen_tool(self, _http_state, tmp_path): { "params": { "task": "Filter for engineering roles", - "input_csv": str(csv_path), + "data": [ + {"company": "Acme", "role": "Engineer"}, + ], } }, ) @@ -254,12 +252,8 @@ async def test_missing_required_params(self, _http_state): assert result.isError @pytest.mark.asyncio - async def test_call_agent_tool(self, _http_state, tmp_path): + async def test_call_agent_tool(self, _http_state): """Submit an agent task via MCP protocol.""" - df = pd.DataFrame([{"name": "Anthropic"}]) - csv_path = tmp_path / "companies.csv" - df.to_csv(csv_path, index=False) - task_id = str(uuid4()) mock_task = _mock_task(task_id) _, fake_create_session = _mock_session() @@ -285,7 +279,7 @@ async def test_call_agent_tool(self, _http_state, tmp_path): { "params": { "task": "Find the CEO", - "input_csv": str(csv_path), + "data": [{"name": "Anthropic"}], } }, ) @@ -334,7 +328,6 @@ async def test_completed_progress_suggests_results(self, _http_state): # ── TestMcpE2ERealApi — real API tests ──────────────────────── - _skip_unless_integration = pytest.mark.skipif( not os.environ.get("RUN_INTEGRATION_TESTS"), reason="Set RUN_INTEGRATION_TESTS=1 to run", @@ -357,7 +350,7 @@ def _real_client(self, _http_state): yield sdk_client @pytest.mark.asyncio - async def test_screen_pipeline(self, _real_client, jobs_csv): + async def test_screen_pipeline(self, _real_client, jobs_csv): # noqa: ARG002 """Submit → poll → results via MCP protocol with real API.""" async with mcp_client() as session: with patch( @@ -369,7 +362,18 @@ async def test_screen_pipeline(self, _real_client, jobs_csv): { "params": { "task": "Filter for remote positions", - "input_csv": jobs_csv, + "data": [ + { + "company": "Airtable", + "title": "Senior Engineer", + "location": "Remote", + }, + { + "company": "Vercel", + "title": "Lead Engineer", + "location": "NYC", + }, + ], } }, ) @@ -429,12 +433,8 @@ async def test_screen_pipeline(self, _real_client, jobs_csv): print(f" Results: {results.content[-1].text}") @pytest.mark.asyncio - async def test_agent_pipeline(self, _real_client, tmp_path): + async def test_agent_pipeline(self, _real_client, tmp_path): # noqa: ARG002 """Submit agent → poll → results via MCP protocol with real API.""" - df = pd.DataFrame([{"name": "Anthropic"}, {"name": "OpenAI"}]) - csv_path = tmp_path / "companies.csv" - df.to_csv(csv_path, index=False) - async with mcp_client() as session: with patch( "everyrow_mcp.tools._get_client", @@ -445,7 +445,7 @@ async def test_agent_pipeline(self, _real_client, tmp_path): { "params": { "task": "Find the company's headquarters city.", - "input_csv": str(csv_path), + "data": [{"name": "Anthropic"}, {"name": "OpenAI"}], "response_schema": { "properties": { "headquarters": { diff --git a/everyrow-mcp/tests/test_server.py b/everyrow-mcp/tests/test_server.py index d30545bf..1a060a3f 100644 --- a/everyrow-mcp/tests/test_server.py +++ b/everyrow-mcp/tests/test_server.py @@ -42,6 +42,7 @@ ScreenInput, SingleAgentInput, StdioResultsInput, + UploadDataInput, _schema_to_model, ) from everyrow_mcp.tools import ( @@ -54,6 +55,7 @@ everyrow_results_http, everyrow_results_stdio, everyrow_single_agent, + everyrow_upload_data, ) from tests.conftest import make_test_context, override_settings @@ -118,73 +120,66 @@ def test_rejects_non_object_property_schema(self): class TestInputValidation: """Tests for input validation.""" - def test_screen_input_validates_csv_path(self, tmp_path: Path): - """Test ScreenInput validates CSV path.""" - with pytest.raises(ValueError, match="does not exist"): + def test_screen_input_rejects_invalid_artifact_id(self): + """Test ScreenInput rejects an invalid artifact_id.""" + with pytest.raises(ValidationError, match="artifact_id must be a valid UUID"): ScreenInput( task="test", - input_csv=str(tmp_path / "nonexistent.csv"), + artifact_id="not-a-uuid", ) - def test_rank_input_validates_field_type(self, tmp_path: Path): - """Test RankInput validates field_type.""" - csv_file = tmp_path / "test.csv" - csv_file.write_text("a,b\n1,2\n") + def test_screen_input_accepts_valid_artifact_id(self): + """Test ScreenInput accepts a valid UUID artifact_id.""" + uid = str(uuid4()) + params = ScreenInput(task="test", artifact_id=uid) + assert params.artifact_id == uid + def test_rank_input_validates_field_type(self): + """Test RankInput validates field_type.""" with pytest.raises(ValidationError, match="Input should be"): RankInput( task="test", - input_csv=str(csv_file), + artifact_id=str(uuid4()), field_name="score", field_type="invalid", # pyright: ignore[reportArgumentType] ) - def test_merge_input_validates_both_csvs(self, tmp_path: Path): - """Test MergeInput validates both CSV paths.""" - csv_file = tmp_path / "test.csv" - csv_file.write_text("a,b\n1,2\n") - - with pytest.raises(ValueError, match="does not exist"): + def test_merge_input_validates_artifact_ids(self): + """Test MergeInput validates artifact IDs.""" + with pytest.raises(ValidationError, match="artifact_id must be a valid UUID"): MergeInput( task="test", - left_csv=str(csv_file), - right_csv=str(tmp_path / "nonexistent.csv"), + left_artifact_id="not-a-uuid", + right_artifact_id=str(uuid4()), ) - def test_agent_input_rejects_empty_response_schema(self, tmp_path: Path): - csv_file = tmp_path / "test.csv" - csv_file.write_text("a,b\n1,2\n") - + def test_agent_input_rejects_empty_response_schema(self): with pytest.raises( ValidationError, match="must include a non-empty top-level 'properties'" ): AgentInput( task="test", - input_csv=str(csv_file), + artifact_id=str(uuid4()), response_schema={}, ) - def test_agent_input_rejects_shorthand_response_schema(self, tmp_path: Path): + def test_agent_input_rejects_shorthand_response_schema(self): """response_schema must be JSON Schema, not a field map.""" - csv_file = tmp_path / "test.csv" - csv_file.write_text("a,b\n1,2\n") - with pytest.raises( ValidationError, match="must include a non-empty top-level 'properties'" ): AgentInput( task="test", - input_csv=str(csv_file), + artifact_id=str(uuid4()), response_schema={"population": "string", "year": "string"}, ) - def test_tool_inputs_accept_example_schemas(self, tmp_path: Path): - csv_file = tmp_path / "test.csv" - csv_file.write_text("a,b\n1,2\n") + def test_tool_inputs_accept_example_schemas(self): + uid = str(uuid4()) ScreenInput( task="test", - input_csv=str(csv_file), + artifact_id=uid, response_schema={ "type": "object", "properties": { @@ -196,7 +191,7 @@ def test_tool_inputs_accept_example_schemas(self, tmp_path: Path): ) AgentInput( task="test", - input_csv=str(csv_file), + artifact_id=uid, response_schema={ "type": "object", "properties": { @@ -208,7 +203,7 @@ def test_tool_inputs_accept_example_schemas(self, tmp_path: Path): ) AgentInput( task="test", - input_csv=str(csv_file), + data=[{"col": "val"}], response_schema={ "type": "object", "properties": { @@ -225,15 +220,14 @@ def test_tool_inputs_accept_example_schemas(self, tmp_path: Path): }, ) - def test_screen_input_requires_boolean_property(self, tmp_path: Path): + def test_screen_input_requires_boolean_property(self): """Screen schemas must include at least one boolean property.""" - csv_file = tmp_path / "test.csv" - csv_file.write_text("a,b\n1,2\n") + uid = str(uuid4()) with pytest.raises(ValidationError, match="must include at least one boolean"): ScreenInput( task="test", - input_csv=str(csv_file), + artifact_id=uid, response_schema={ "type": "object", "properties": {"reason": {"type": "string"}}, @@ -242,7 +236,7 @@ def test_screen_input_requires_boolean_property(self, tmp_path: Path): ScreenInput( task="test", - input_csv=str(csv_file), + artifact_id=uid, response_schema={ "type": "object", "properties": {"pass": {"type": "boolean"}}, @@ -331,7 +325,7 @@ class TestAgent: """Tests for everyrow_agent.""" @pytest.mark.asyncio - async def test_submit_returns_task_id(self, companies_csv: str): + async def test_submit_returns_task_id(self): """Test that submit returns immediately with task_id and session_url.""" mock_task = _make_mock_task() mock_session = _make_mock_session() @@ -351,7 +345,10 @@ async def test_submit_returns_task_id(self, companies_csv: str): params = AgentInput( task="Find HQ for each company", - input_csv=companies_csv, + data=[ + {"name": "TechStart", "industry": "Software"}, + {"name": "AILabs", "industry": "AI/ML"}, + ], ) result = await everyrow_agent(params, ctx) @@ -1040,11 +1037,11 @@ def test_cancel_input_validation(self): class TestAgentInlineInput: - """Tests for everyrow_agent with inline CSV data.""" + """Tests for everyrow_agent with inline data.""" @pytest.mark.asyncio async def test_submit_with_inline_data(self): - """Test agent submission with data instead of input_csv.""" + """Test agent submission with data instead of artifact_id.""" mock_task = _make_mock_task() mock_session = _make_mock_session() mock_client = _make_mock_client() @@ -1063,7 +1060,10 @@ async def test_submit_with_inline_data(self): params = AgentInput( task="Find HQ for each company", - data="name,industry\nTechStart,Software\nAILabs,AI\n", + data=[ + {"name": "TechStart", "industry": "Software"}, + {"name": "AILabs", "industry": "AI"}, + ], ) result = await everyrow_agent(params, ctx) @@ -1077,6 +1077,37 @@ async def test_submit_with_inline_data(self): call_kwargs = mock_op.call_args[1] assert len(call_kwargs["input"]) == 2 + @pytest.mark.asyncio + async def test_submit_with_artifact_id(self): + """Test agent submission with artifact_id.""" + mock_task = _make_mock_task() + mock_session = _make_mock_session() + mock_client = _make_mock_client() + ctx = make_test_context(mock_client) + uid = str(uuid4()) + + with ( + patch( + "everyrow_mcp.tools.agent_map_async", new_callable=AsyncMock + ) as mock_op, + patch( + "everyrow_mcp.tools.create_session", + return_value=_make_async_context_manager(mock_session), + ), + ): + mock_op.return_value = mock_task + + params = AgentInput(task="Find HQ", artifact_id=uid) + result = await everyrow_agent(params, ctx) + + assert len(result) == 1 + text = result[0].text + assert str(mock_task.task_id) in text + + # Verify the UUID was passed to the SDK + call_kwargs = mock_op.call_args[1] + assert call_kwargs["input"] == UUID(uid) + class TestAgentInputValidation: """Tests for AgentInput model validation with inline data.""" @@ -1086,27 +1117,22 @@ def test_requires_one_input_source(self): with pytest.raises(ValidationError, match="Provide exactly one of"): AgentInput(task="test") - def test_rejects_both_input_sources(self, companies_csv: str): + def test_rejects_both_input_sources(self): """Test that providing both raises.""" with pytest.raises(ValidationError, match="Provide exactly one of"): AgentInput( task="test", - input_csv=companies_csv, - data="name,industry\nA,B\n", + artifact_id=str(uuid4()), + data=[{"a": "b"}], ) - def test_accepts_input_csv(self, companies_csv: str): - """Test that input_csv alone is valid.""" - params = AgentInput(task="test", input_csv=companies_csv) - assert params.input_csv == companies_csv + def test_accepts_artifact_id(self): + """Test that artifact_id alone is valid.""" + uid = str(uuid4()) + params = AgentInput(task="test", artifact_id=uid) + assert params.artifact_id == uid assert params.data is None - def test_accepts_data_csv_string(self): - """Test that data as CSV string is valid.""" - params = AgentInput(task="test", data="a,b\n1,2\n") - assert params.data is not None - assert params.input_csv is None - def test_accepts_data_json_list(self): """Test that data as JSON list of dicts is valid.""" records = [ @@ -1115,7 +1141,99 @@ def test_accepts_data_json_list(self): ] params = AgentInput(task="test", data=records) assert params.data == records - assert params.input_csv is None + assert params.artifact_id is None + + +class TestUploadData: + """Tests for everyrow_upload_data.""" + + @pytest.mark.asyncio + async def test_upload_from_url(self): + """Test uploading data from a URL.""" + mock_client = _make_mock_client() + mock_session = _make_mock_session() + ctx = make_test_context(mock_client) + artifact_uuid = uuid4() + + mock_df = pd.DataFrame([{"a": 1, "b": 2}, {"a": 3, "b": 4}]) + + with ( + patch( + "everyrow_mcp.tools.fetch_csv_from_url", + new_callable=AsyncMock, + return_value=mock_df, + ), + patch( + "everyrow_mcp.tools.create_session", + return_value=_make_async_context_manager(mock_session), + ), + patch( + "everyrow_mcp.tools.create_table_artifact", + new_callable=AsyncMock, + return_value=artifact_uuid, + ) as mock_create, + ): + params = UploadDataInput(source="https://example.com/data.csv") + result = await everyrow_upload_data(params, ctx) + + assert len(result) == 1 + data = json.loads(result[0].text) + assert data["artifact_id"] == str(artifact_uuid) + assert data["rows"] == 2 + assert data["columns"] == ["a", "b"] + mock_create.assert_called_once() + + @pytest.mark.asyncio + async def test_upload_from_local_path(self, tmp_path: Path): + """Test uploading data from a local CSV file (stdio mode).""" + csv_file = tmp_path / "test.csv" + csv_file.write_text("x,y\n1,2\n3,4\n") + + mock_client = _make_mock_client() + mock_session = _make_mock_session() + ctx = make_test_context(mock_client) + artifact_uuid = uuid4() + + with ( + patch( + "everyrow_mcp.tools.create_session", + return_value=_make_async_context_manager(mock_session), + ), + patch( + "everyrow_mcp.tools.create_table_artifact", + new_callable=AsyncMock, + return_value=artifact_uuid, + ), + ): + params = UploadDataInput(source=str(csv_file)) + result = await everyrow_upload_data(params, ctx) + + data = json.loads(result[0].text) + assert data["artifact_id"] == str(artifact_uuid) + assert data["rows"] == 2 + + def test_upload_rejects_local_path_in_http_mode(self, tmp_path: Path): + """Test that local paths are rejected in HTTP mode.""" + csv_file = tmp_path / "test.csv" + csv_file.write_text("x,y\n1,2\n") + + with override_settings(transport="streamable-http"): + with pytest.raises( + ValidationError, match="Local file paths are not supported" + ): + UploadDataInput(source=str(csv_file)) + + def test_upload_accepts_url_in_http_mode(self): + """Test that URLs are accepted in HTTP mode.""" + with override_settings(transport="streamable-http"): + params = UploadDataInput(source="https://example.com/data.csv") + assert params.source == "https://example.com/data.csv" + + def test_upload_google_sheets_url(self): + """Test that Google Sheets URLs are accepted.""" + url = "https://docs.google.com/spreadsheets/d/1abc/edit#gid=0" + params = UploadDataInput(source=url) + assert params.source == url class TestResultsInputValidation: @@ -1205,44 +1323,52 @@ class TestInputModelsUnchanged: """Verify that input models require an input source.""" def test_rank_requires_input_source(self): - """RankInput requires either input_csv or data.""" + """RankInput requires either artifact_id or data.""" with pytest.raises(ValidationError): RankInput(task="test", field_name="score") def test_rank_accepts_data(self): - """RankInput accepts data as alternative to input_csv.""" - params = RankInput(task="test", field_name="score", data="col\nval") - assert params.data == "col\nval" - assert params.input_csv is None + """RankInput accepts data as alternative to artifact_id.""" + params = RankInput( + task="test", + field_name="score", + data=[{"col": "val"}], + ) + assert params.data == [{"col": "val"}] + assert params.artifact_id is None def test_rank_rejects_both_inputs(self): - """RankInput rejects both input_csv and data.""" + """RankInput rejects both artifact_id and data.""" with pytest.raises(ValidationError): RankInput( task="test", field_name="score", - input_csv="/tmp/test.csv", - data="col\nval", + artifact_id=str(uuid4()), + data=[{"col": "val"}], ) def test_screen_requires_input_source(self): - """ScreenInput requires either input_csv or data.""" + """ScreenInput requires either artifact_id or data.""" with pytest.raises(ValidationError): ScreenInput(task="test") def test_screen_accepts_data(self): - """ScreenInput accepts data as alternative to input_csv.""" - params = ScreenInput(task="test", data="col\nval") - assert params.data == "col\nval" - assert params.input_csv is None + """ScreenInput accepts data as alternative to artifact_id.""" + params = ScreenInput(task="test", data=[{"col": "val"}]) + assert params.data == [{"col": "val"}] + assert params.artifact_id is None def test_screen_rejects_both_inputs(self): - """ScreenInput rejects both input_csv and data.""" + """ScreenInput rejects both artifact_id and data.""" with pytest.raises(ValidationError): - ScreenInput(task="test", input_csv="/tmp/test.csv", data="col\nval") + ScreenInput( + task="test", + artifact_id=str(uuid4()), + data=[{"col": "val"}], + ) def test_dedupe_requires_input_source(self): - """DedupeInput requires either input_csv or data.""" + """DedupeInput requires either artifact_id or data.""" with pytest.raises(ValidationError): DedupeInput(equivalence_relation="same entity") @@ -1256,7 +1382,7 @@ class TestStdioVsHttpGating: """Verify that widget JSON is only included in HTTP mode responses.""" @pytest.mark.asyncio - async def test_submit_stdio_returns_single_content(self, companies_csv: str): + async def test_submit_stdio_returns_single_content(self): """In stdio mode, submission tools return only human-readable text.""" mock_task = _make_mock_task() mock_session = _make_mock_session() @@ -1273,16 +1399,17 @@ async def test_submit_stdio_returns_single_content(self, companies_csv: str): ), ): mock_op.return_value = mock_task - params = AgentInput(task="test", input_csv=companies_csv) + params = AgentInput( + task="test", + data=[{"name": "TechStart", "industry": "Software"}], + ) result = await everyrow_agent(params, ctx) assert len(result) == 1 assert "Task ID:" in result[0].text @pytest.mark.asyncio - async def test_submit_http_returns_widget_and_text( - self, companies_csv: str, fake_redis - ): + async def test_submit_http_returns_widget_and_text(self, fake_redis): """In HTTP mode, submission tools return widget JSON + human text.""" mock_task = _make_mock_task() mock_session = _make_mock_session() @@ -1301,7 +1428,10 @@ async def test_submit_http_returns_widget_and_text( patch.object(redis_store, "get_redis_client", return_value=fake_redis), ): mock_op.return_value = mock_task - params = AgentInput(task="test", input_csv=companies_csv) + params = AgentInput( + task="test", + data=[{"name": "TechStart", "industry": "Software"}], + ) result = await everyrow_agent(params, ctx) assert len(result) == 2 diff --git a/everyrow-mcp/tests/test_stdio_content.py b/everyrow-mcp/tests/test_stdio_content.py index 8f092ea2..b37bb37c 100644 --- a/everyrow-mcp/tests/test_stdio_content.py +++ b/everyrow-mcp/tests/test_stdio_content.py @@ -221,13 +221,13 @@ class TestStdioSubmissionContent: """All submission tools must return clean, concise text in stdio mode.""" @pytest.mark.asyncio - async def test_agent_content(self, companies_csv: str): + async def test_agent_content(self): task, _session, _client, ctx, *patches = _submit_patches( "everyrow_mcp.tools.agent_map_async" ) with patches[0], patches[1]: result = await everyrow_agent( - AgentInput(task="Find HQ", input_csv=companies_csv), ctx + AgentInput(task="Find HQ", data=[{"name": "TechStart"}]), ctx ) assert len(result) == 1 @@ -254,7 +254,7 @@ async def test_single_agent_content(self): assert "Session:" in text @pytest.mark.asyncio - async def test_rank_content(self, companies_csv: str): + async def test_rank_content(self): _task, _session, _client, ctx, *patches = _submit_patches( "everyrow_mcp.tools.rank_async" ) @@ -262,7 +262,7 @@ async def test_rank_content(self, companies_csv: str): result = await everyrow_rank( RankInput( task="Score by AI adoption", - input_csv=companies_csv, + data=[{"name": "TechStart", "industry": "Software"}], field_name="ai_score", ), ctx, @@ -272,13 +272,15 @@ async def test_rank_content(self, companies_csv: str): assert_stdio_clean(result, tool_name="everyrow_rank") @pytest.mark.asyncio - async def test_screen_content(self, companies_csv: str): + async def test_screen_content(self): _task, _session, _client, ctx, *patches = _submit_patches( "everyrow_mcp.tools.screen_async" ) with patches[0], patches[1]: result = await everyrow_screen( - ScreenInput(task="Is this a tech company?", input_csv=companies_csv), + ScreenInput( + task="Is this a tech company?", data=[{"name": "TechStart"}] + ), ctx, ) @@ -286,13 +288,16 @@ async def test_screen_content(self, companies_csv: str): assert_stdio_clean(result, tool_name="everyrow_screen") @pytest.mark.asyncio - async def test_dedupe_content(self, contacts_csv: str): + async def test_dedupe_content(self): _task, _session, _client, ctx, *patches = _submit_patches( "everyrow_mcp.tools.dedupe_async" ) with patches[0], patches[1]: result = await everyrow_dedupe( - DedupeInput(equivalence_relation="Same person", input_csv=contacts_csv), + DedupeInput( + equivalence_relation="Same person", + data=[{"name": "John Smith"}, {"name": "J. Smith"}], + ), ctx, ) @@ -300,7 +305,7 @@ async def test_dedupe_content(self, contacts_csv: str): assert_stdio_clean(result, tool_name="everyrow_dedupe") @pytest.mark.asyncio - async def test_merge_content(self, products_csv: str, suppliers_csv: str): + async def test_merge_content(self): _task, _session, _client, ctx, *patches = _submit_patches( "everyrow_mcp.tools.merge_async" ) @@ -308,8 +313,8 @@ async def test_merge_content(self, products_csv: str, suppliers_csv: str): result = await everyrow_merge( MergeInput( task="Match products to suppliers", - left_csv=products_csv, - right_csv=suppliers_csv, + left_data=[{"product_name": "Photoshop", "vendor": "Adobe"}], + right_data=[{"company_name": "Adobe Inc", "approved": True}], ), ctx, ) @@ -582,21 +587,21 @@ class TestToolSchemas: ("everyrow_dedupe", "DedupeInput"), ], ) - def test_schema_has_input_csv_and_data(self, tool_name: str, def_name: str): - """CSV-based tools expose both input_csv and data.""" + def test_schema_has_artifact_id_and_data(self, tool_name: str, def_name: str): + """Processing tools expose both artifact_id and data.""" tool = mcp_app._tool_manager.get_tool(tool_name) assert tool is not None input_def = tool.parameters["$defs"][def_name] - assert "input_csv" in input_def["properties"] + assert "artifact_id" in input_def["properties"] assert "data" in input_def["properties"] - def test_merge_schema_has_csv_and_data_fields(self): - """everyrow_merge exposes left_csv/right_csv and left_data/right_data.""" + def test_merge_schema_has_artifact_id_and_data_fields(self): + """everyrow_merge exposes left/right artifact_id and data fields.""" tool = mcp_app._tool_manager.get_tool("everyrow_merge") assert tool is not None merge_def = tool.parameters["$defs"]["MergeInput"] - assert "left_csv" in merge_def["properties"] - assert "right_csv" in merge_def["properties"] + assert "left_artifact_id" in merge_def["properties"] + assert "right_artifact_id" in merge_def["properties"] assert "left_data" in merge_def["properties"] assert "right_data" in merge_def["properties"] @@ -608,7 +613,7 @@ class TestHttpModeIncludesWidgets: """Verify HTTP mode DOES include widget data (confirming the gate works both ways).""" @pytest.mark.asyncio - async def test_submit_http_has_widget_json(self, companies_csv: str, fake_redis): + async def test_submit_http_has_widget_json(self, fake_redis): """HTTP mode must include widget JSON as the first TextContent.""" _task, _session, _client, ctx, *patches = _submit_patches( "everyrow_mcp.tools.agent_map_async" @@ -620,7 +625,7 @@ async def test_submit_http_has_widget_json(self, companies_csv: str, fake_redis) patch.object(redis_store, "get_redis_client", return_value=fake_redis), ): result = await everyrow_agent( - AgentInput(task="Find HQ", input_csv=companies_csv), ctx + AgentInput(task="Find HQ", data=[{"name": "TechStart"}]), ctx ) assert len(result) == 2 diff --git a/everyrow-mcp/tests/test_uploads.py b/everyrow-mcp/tests/test_uploads.py new file mode 100644 index 00000000..29d67356 --- /dev/null +++ b/everyrow-mcp/tests/test_uploads.py @@ -0,0 +1,129 @@ +"""Tests for the presigned URL upload system.""" + +from __future__ import annotations + +import json +import time +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from pydantic import ValidationError + +from everyrow_mcp.uploads import ( + RequestUploadUrlInput, + register_upload_tool, + sign_upload_url, + verify_upload_signature, +) +from tests.conftest import make_test_context, override_settings + + +class TestHmacSigning: + """Tests for HMAC signing and verification.""" + + def test_roundtrip(self): + """A signature can be verified immediately.""" + upload_id = "test-upload-id" + expires_at = int(time.time()) + 300 + sig = sign_upload_url(upload_id, expires_at) + assert verify_upload_signature(upload_id, expires_at, sig) is True + + def test_expired_signature_rejected(self): + """An expired signature is rejected.""" + upload_id = "test-upload-id" + expires_at = int(time.time()) - 1 # already expired + sig = sign_upload_url(upload_id, expires_at) + assert verify_upload_signature(upload_id, expires_at, sig) is False + + def test_tampered_signature_rejected(self): + """A tampered signature is rejected.""" + upload_id = "test-upload-id" + expires_at = int(time.time()) + 300 + sig = sign_upload_url(upload_id, expires_at) + assert verify_upload_signature(upload_id, expires_at, sig + "x") is False + + def test_different_upload_id_rejected(self): + """A signature for a different upload_id is rejected.""" + expires_at = int(time.time()) + 300 + sig = sign_upload_url("upload-1", expires_at) + assert verify_upload_signature("upload-2", expires_at, sig) is False + + +class TestRequestUploadUrlInput: + """Tests for the input model.""" + + def test_valid_csv_filename(self): + params = RequestUploadUrlInput(filename="data.csv") + assert params.filename == "data.csv" + + def test_empty_filename_rejected(self): + with pytest.raises(ValidationError): + RequestUploadUrlInput(filename="") + + def test_extra_fields_rejected(self): + with pytest.raises(ValidationError): + RequestUploadUrlInput(filename="data.csv", extra="x") # type: ignore[call-arg] + + +def _capture_tool_fn(mock_mcp: MagicMock): + """Register upload tool on a mock FastMCP and return the captured function.""" + captured: list = [] + + def capture_tool(**_kwargs): + def decorator(fn): + captured.append(fn) + return fn + + return decorator + + mock_mcp.tool = capture_tool + register_upload_tool(mock_mcp) + assert captured, "register_upload_tool did not register a tool" + return captured[0] + + +class TestRequestUploadUrlTool: + """Tests for the request_upload_url tool function.""" + + @pytest.mark.asyncio + async def test_returns_upload_url(self, fake_redis): # noqa: ARG002 + """Tool returns a signed upload URL and curl instructions.""" + mock_mcp = MagicMock() + tool_fn = _capture_tool_fn(mock_mcp) + + mock_client = MagicMock(token="fake-token") + ctx = make_test_context(mock_client) + + with ( + override_settings(transport="streamable-http"), + patch( + "everyrow_mcp.uploads.redis_store.store_upload_meta", + new_callable=AsyncMock, + ), + ): + params = RequestUploadUrlInput(filename="data.csv") + result = await tool_fn(params, ctx) + + assert len(result) == 1 + data = json.loads(result[0].text) + assert "upload_url" in data + assert "upload_id" in data + assert "expires_in" in data + assert "curl_command" in data + assert data["expires_in"] == 300 + + @pytest.mark.asyncio + async def test_rejects_non_csv(self, fake_redis): # noqa: ARG002 + """Tool rejects non-CSV filenames.""" + mock_mcp = MagicMock() + tool_fn = _capture_tool_fn(mock_mcp) + + mock_client = MagicMock(token="fake-token") + ctx = make_test_context(mock_client) + + with override_settings(transport="streamable-http"): + params = RequestUploadUrlInput(filename="data.json") + result = await tool_fn(params, ctx) + + assert "Error" in result[0].text + assert ".csv" in result[0].text diff --git a/everyrow-mcp/tests/test_utils.py b/everyrow-mcp/tests/test_utils.py index ada4f8d4..97d54fc6 100644 --- a/everyrow-mcp/tests/test_utils.py +++ b/everyrow-mcp/tests/test_utils.py @@ -6,11 +6,14 @@ import pytest from everyrow_mcp.utils import ( + _is_url, + _normalise_google_sheets_url, load_data, resolve_output_path, save_result_to_csv, validate_csv_path, validate_output_path, + validate_url, ) @@ -99,7 +102,7 @@ def test_different_prefixes(self, tmp_path: Path): class TestLoadData: - """Tests for load_data.""" + """Tests for load_data (internal helper, still used by upload_data).""" def test_load_from_csv_file(self, tmp_path: Path): """Test loading from a CSV file path.""" @@ -168,3 +171,66 @@ def test_save_dataframe(self, tmp_path: Path): loaded = pd.read_csv(output_path) assert list(loaded.columns) == ["a", "b"] assert len(loaded) == 3 + + +class TestIsUrl: + """Tests for _is_url.""" + + def test_http_url(self): + assert _is_url("http://example.com") is True + + def test_https_url(self): + assert _is_url("https://example.com/data.csv") is True + + def test_local_path(self): + assert _is_url("/Users/test/data.csv") is False + + def test_relative_path(self): + assert _is_url("data.csv") is False + + +class TestValidateUrl: + """Tests for validate_url.""" + + def test_valid_https(self): + url = "https://example.com/data.csv" + assert validate_url(url) == url + + def test_valid_http(self): + url = "http://example.com/data.csv" + assert validate_url(url) == url + + def test_rejects_ftp(self): + with pytest.raises(ValueError, match="http or https"): + validate_url("ftp://example.com/data.csv") + + def test_rejects_no_host(self): + with pytest.raises(ValueError, match="no host"): + validate_url("https://") + + +class TestNormaliseGoogleSheetsUrl: + """Tests for _normalise_google_sheets_url.""" + + def test_edit_url_to_export(self): + url = "https://docs.google.com/spreadsheets/d/1abc/edit" + result = _normalise_google_sheets_url(url) + assert result == "https://docs.google.com/spreadsheets/d/1abc/export?format=csv" + + def test_edit_url_with_gid(self): + url = "https://docs.google.com/spreadsheets/d/1abc/edit#gid=123" + result = _normalise_google_sheets_url(url) + assert ( + result + == "https://docs.google.com/spreadsheets/d/1abc/export?format=csv&gid=123" + ) + + def test_already_export_url(self): + url = "https://docs.google.com/spreadsheets/d/1abc/export?format=csv" + result = _normalise_google_sheets_url(url) + assert result == url + + def test_non_google_url_unchanged(self): + url = "https://example.com/data.csv" + result = _normalise_google_sheets_url(url) + assert result == url From 85d53872c205ac1a6d4f4f6e95ca6b72f9f8fbe0 Mon Sep 17 00:00:00 2001 From: Rafael Poyiadzi Date: Tue, 24 Feb 2026 15:24:54 +0000 Subject: [PATCH 2/8] Log input source (artifact_id vs data) on task submission Co-Authored-By: Claude Opus 4.6 --- everyrow-mcp/src/everyrow_mcp/tool_helpers.py | 8 ++++++++ everyrow-mcp/src/everyrow_mcp/tools.py | 14 ++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/everyrow-mcp/src/everyrow_mcp/tool_helpers.py b/everyrow-mcp/src/everyrow_mcp/tool_helpers.py index e5912d66..70d502c8 100644 --- a/everyrow-mcp/src/everyrow_mcp/tool_helpers.py +++ b/everyrow-mcp/src/everyrow_mcp/tool_helpers.py @@ -285,8 +285,16 @@ def write_initial_task_state( task_type: PublicTaskType, session_url: str, total: int, + input_source: str = "unknown", ) -> None: """Write initial task state file when a task is first submitted.""" + logger.info( + "Task %s (%s): input_source=%s, total=%d", + task_id, + task_type.value, + input_source, + total, + ) if settings.is_http: return _write_task_state_file( diff --git a/everyrow-mcp/src/everyrow_mcp/tools.py b/everyrow-mcp/src/everyrow_mcp/tools.py index 2416cda2..9150cfd6 100644 --- a/everyrow-mcp/src/everyrow_mcp/tools.py +++ b/everyrow-mcp/src/everyrow_mcp/tools.py @@ -69,6 +69,11 @@ def _resolve_input(params) -> UUID | pd.DataFrame: return pd.DataFrame(params.data) +def _input_source(params) -> str: + """Return a label describing which input field was used.""" + return "artifact_id" if params.artifact_id else "data" + + logger = logging.getLogger(__name__) @@ -126,6 +131,7 @@ async def everyrow_agent(params: AgentInput, ctx: EveryRowContext) -> list[TextC task_type=PublicTaskType.AGENT, session_url=session_url, total=total, + input_source=_input_source(params), ) return await create_tool_response( @@ -199,6 +205,7 @@ async def everyrow_single_agent( task_type=PublicTaskType.AGENT, session_url=session_url, total=1, + input_source="single_agent", ) return await create_tool_response( @@ -273,6 +280,7 @@ async def everyrow_rank(params: RankInput, ctx: EveryRowContext) -> list[TextCon task_type=PublicTaskType.RANK, session_url=session_url, total=total, + input_source=_input_source(params), ) return await create_tool_response( @@ -353,6 +361,7 @@ async def everyrow_screen( task_type=PublicTaskType.SCREEN, session_url=session_url, total=total, + input_source=_input_source(params), ) return await create_tool_response( @@ -424,6 +433,7 @@ async def everyrow_dedupe( task_type=PublicTaskType.DEDUPE, session_url=session_url, total=total, + input_source=_input_source(params), ) return await create_tool_response( @@ -514,11 +524,14 @@ async def everyrow_merge(params: MergeInput, ctx: EveryRowContext) -> list[TextC ) task_id = str(cohort_task.task_id) total = len(left_input) if isinstance(left_input, pd.DataFrame) else 0 + left_src = "artifact_id" if params.left_artifact_id else "data" + right_src = "artifact_id" if params.right_artifact_id else "data" write_initial_task_state( task_id, task_type=PublicTaskType.MERGE, session_url=session_url, total=total, + input_source=f"left={left_src}, right={right_src}", ) return await create_tool_response( @@ -588,6 +601,7 @@ async def everyrow_forecast( task_type=PublicTaskType.FORECAST, session_url=session_url, total=total, + input_source=_input_source(params), ) return await create_tool_response( From 88851d0b163b3ec400a28d5097f5e6d966b63356 Mon Sep 17 00:00:00 2001 From: Rafael Poyiadzi Date: Tue, 24 Feb 2026 15:35:07 +0000 Subject: [PATCH 3/8] Move input resolution to model properties; fix review bugs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Adopt _aid_or_dataframe and _input_data_mode properties on models, removing free functions from tools.py - Add left/right properties to MergeInput - Use `is not None` for artifact_id checks - Fix sync context manager in uploads.py (with → async with) - Remove duplicate max_inline_rows in config.py - Update integration tests to use data instead of removed input_csv - Reject empty CSV in upload_data local file path Co-Authored-By: Claude Opus 4.6 --- everyrow-mcp/src/everyrow_mcp/config.py | 4 -- everyrow-mcp/src/everyrow_mcp/models.py | 49 ++++++++++++++++++++++ everyrow-mcp/src/everyrow_mcp/tools.py | 52 +++++++----------------- everyrow-mcp/src/everyrow_mcp/uploads.py | 2 +- everyrow-mcp/tests/test_stdio_content.py | 15 +++---- 5 files changed, 71 insertions(+), 51 deletions(-) diff --git a/everyrow-mcp/src/everyrow_mcp/config.py b/everyrow-mcp/src/everyrow_mcp/config.py index 21b8be50..d2c25e4a 100644 --- a/everyrow-mcp/src/everyrow_mcp/config.py +++ b/everyrow-mcp/src/everyrow_mcp/config.py @@ -13,10 +13,6 @@ class Settings(BaseSettings): everyrow_api_url: str = Field(default="https://everyrow.io/api/v0") preview_size: int = Field(default=1000) - max_inline_rows: int = Field( - default=50_000, - description="Maximum number of rows allowed in inline JSON data", - ) max_schema_properties: int = Field( default=50, description="Maximum number of properties allowed in a response schema", diff --git a/everyrow-mcp/src/everyrow_mcp/models.py b/everyrow-mcp/src/everyrow_mcp/models.py index a2cebfef..1a22f8f2 100644 --- a/everyrow-mcp/src/everyrow_mcp/models.py +++ b/everyrow-mcp/src/everyrow_mcp/models.py @@ -1,9 +1,11 @@ """Input models and schema helpers for everyrow MCP tools.""" +from enum import StrEnum from pathlib import Path from typing import Any, Literal from uuid import UUID +import pandas as pd from jsonschema import SchemaError from jsonschema.validators import validator_for from pydantic import ( @@ -28,6 +30,11 @@ } +class InputDataMode(StrEnum): + dataframe = "DATAFRAME" + artifact_id = "ARTIFACT_ID" + + def _validate_response_schema(schema: dict[str, Any] | None) -> dict[str, Any] | None: """Validate response_schema is a JSON Schema object schema.""" if schema is None: @@ -173,6 +180,20 @@ def check_input_source(self): ) return self + @property + def _input_data_mode(self) -> InputDataMode: + return ( + InputDataMode.artifact_id + if self.artifact_id is not None + else InputDataMode.dataframe + ) + + @property + def _aid_or_dataframe(self) -> UUID | pd.DataFrame: + if self.artifact_id is not None: + return UUID(self.artifact_id) + return pd.DataFrame(self.data) + class AgentInput(_SingleSourceInput): """Input for the agent operation.""" @@ -345,6 +366,34 @@ def check_sources(self) -> "MergeInput": ) return self + @property + def _left_input_data_mode(self) -> InputDataMode: + return ( + InputDataMode.artifact_id + if self.left_artifact_id is not None + else InputDataMode.dataframe + ) + + @property + def _left_aid_or_dataframe(self) -> UUID | pd.DataFrame: + if self.left_artifact_id is not None: + return UUID(self.left_artifact_id) + return pd.DataFrame(self.left_data) + + @property + def _right_input_data_mode(self) -> InputDataMode: + return ( + InputDataMode.artifact_id + if self.right_artifact_id is not None + else InputDataMode.dataframe + ) + + @property + def _right_aid_or_dataframe(self) -> UUID | pd.DataFrame: + if self.right_artifact_id is not None: + return UUID(self.right_artifact_id) + return pd.DataFrame(self.right_data) + class ForecastInput(_SingleSourceInput): """Input for the forecast operation.""" diff --git a/everyrow-mcp/src/everyrow_mcp/tools.py b/everyrow-mcp/src/everyrow_mcp/tools.py index 9150cfd6..f6caa974 100644 --- a/everyrow-mcp/src/everyrow_mcp/tools.py +++ b/everyrow-mcp/src/everyrow_mcp/tools.py @@ -61,19 +61,6 @@ ) from everyrow_mcp.utils import _is_url, fetch_csv_from_url, save_result_to_csv - -def _resolve_input(params) -> UUID | pd.DataFrame: - """Resolve artifact_id or data to a UUID or DataFrame for SDK ops.""" - if params.artifact_id: - return UUID(params.artifact_id) - return pd.DataFrame(params.data) - - -def _input_source(params) -> str: - """Return a label describing which input field was used.""" - return "artifact_id" if params.artifact_id else "data" - - logger = logging.getLogger(__name__) @@ -108,7 +95,7 @@ async def everyrow_agent(params: AgentInput, ctx: EveryRowContext) -> list[TextC client = _get_client(ctx) _clear_task_state() - input_data = _resolve_input(params) + input_data = params._aid_or_dataframe response_model: type[BaseModel] | None = None if params.response_schema: @@ -131,7 +118,7 @@ async def everyrow_agent(params: AgentInput, ctx: EveryRowContext) -> list[TextC task_type=PublicTaskType.AGENT, session_url=session_url, total=total, - input_source=_input_source(params), + input_source=params._input_data_mode.value, ) return await create_tool_response( @@ -256,7 +243,7 @@ async def everyrow_rank(params: RankInput, ctx: EveryRowContext) -> list[TextCon client = _get_client(ctx) _clear_task_state() - input_data = _resolve_input(params) + input_data = params._aid_or_dataframe response_model: type[BaseModel] | None = None if params.response_schema: @@ -280,7 +267,7 @@ async def everyrow_rank(params: RankInput, ctx: EveryRowContext) -> list[TextCon task_type=PublicTaskType.RANK, session_url=session_url, total=total, - input_source=_input_source(params), + input_source=params._input_data_mode.value, ) return await create_tool_response( @@ -340,7 +327,7 @@ async def everyrow_screen( client = _get_client(ctx) _clear_task_state() - input_data = _resolve_input(params) + input_data = params._aid_or_dataframe response_model: type[BaseModel] | None = None if params.response_schema: @@ -361,7 +348,7 @@ async def everyrow_screen( task_type=PublicTaskType.SCREEN, session_url=session_url, total=total, - input_source=_input_source(params), + input_source=params._input_data_mode.value, ) return await create_tool_response( @@ -417,7 +404,7 @@ async def everyrow_dedupe( client = _get_client(ctx) _clear_task_state() - input_data = _resolve_input(params) + input_data = params._aid_or_dataframe async with create_session(client=client) as session: session_url = session.get_url() @@ -433,7 +420,7 @@ async def everyrow_dedupe( task_type=PublicTaskType.DEDUPE, session_url=session_url, total=total, - input_source=_input_source(params), + input_source=params._input_data_mode.value, ) return await create_tool_response( @@ -498,17 +485,8 @@ async def everyrow_merge(params: MergeInput, ctx: EveryRowContext) -> list[TextC client = _get_client(ctx) _clear_task_state() - left_input: UUID | pd.DataFrame - if params.left_artifact_id: - left_input = UUID(params.left_artifact_id) - else: - left_input = pd.DataFrame(params.left_data) - - right_input: UUID | pd.DataFrame - if params.right_artifact_id: - right_input = UUID(params.right_artifact_id) - else: - right_input = pd.DataFrame(params.right_data) + left_input = params._left_aid_or_dataframe + right_input = params._right_aid_or_dataframe async with create_session(client=client) as session: session_url = session.get_url() @@ -524,14 +502,12 @@ async def everyrow_merge(params: MergeInput, ctx: EveryRowContext) -> list[TextC ) task_id = str(cohort_task.task_id) total = len(left_input) if isinstance(left_input, pd.DataFrame) else 0 - left_src = "artifact_id" if params.left_artifact_id else "data" - right_src = "artifact_id" if params.right_artifact_id else "data" write_initial_task_state( task_id, task_type=PublicTaskType.MERGE, session_url=session_url, total=total, - input_source=f"left={left_src}, right={right_src}", + input_source=f"left={params._left_input_data_mode.value}, right={params._right_input_data_mode.value}", ) return await create_tool_response( @@ -585,7 +561,7 @@ async def everyrow_forecast( client = _get_client(ctx) _clear_task_state() - input_data = _resolve_input(params) + input_data = params._aid_or_dataframe async with create_session(client=client) as session: session_url = session.get_url() @@ -601,7 +577,7 @@ async def everyrow_forecast( task_type=PublicTaskType.FORECAST, session_url=session_url, total=total, - input_source=_input_source(params), + input_source=params._input_data_mode.value, ) return await create_tool_response( @@ -649,6 +625,8 @@ async def everyrow_upload_data( df = await fetch_csv_from_url(params.source) else: df = pd.read_csv(params.source) + if df.empty: + raise ValueError(f"CSV file is empty: {params.source}") async with create_session(client=client) as session: artifact_id = await create_table_artifact(df, session) diff --git a/everyrow-mcp/src/everyrow_mcp/uploads.py b/everyrow-mcp/src/everyrow_mcp/uploads.py index d5799c8d..09d049aa 100644 --- a/everyrow-mcp/src/everyrow_mcp/uploads.py +++ b/everyrow-mcp/src/everyrow_mcp/uploads.py @@ -205,7 +205,7 @@ async def handle_upload(request: Request) -> JSONResponse: try: from everyrow.api_utils import create_client # noqa: PLC0415 - with create_client() as client: + async with create_client() as client: async with create_session(client=client) as session: artifact_id = await create_table_artifact(df, session) except Exception as exc: diff --git a/everyrow-mcp/tests/test_stdio_content.py b/everyrow-mcp/tests/test_stdio_content.py index b37bb37c..a4360c32 100644 --- a/everyrow-mcp/tests/test_stdio_content.py +++ b/everyrow-mcp/tests/test_stdio_content.py @@ -740,9 +740,7 @@ def _real_stdio_client(self): yield sdk_client @pytest.mark.asyncio - async def test_screen_pipeline_stdio_clean( - self, _real_stdio_client, jobs_csv, tmp_path - ): + async def test_screen_pipeline_stdio_clean(self, _real_stdio_client, tmp_path): """Screen: submit → poll → results. Every response must be stdio-clean.""" async with _stdio_mcp_client(_real_stdio_client) as session: # ── Submit ── @@ -751,7 +749,10 @@ async def test_screen_pipeline_stdio_clean( { "params": { "task": "Filter for remote positions", - "input_csv": jobs_csv, + "data": [ + {"company": "Airtable", "role": "Engineer", "remote": True}, + {"company": "Notion", "role": "Designer", "remote": False}, + ], } }, ) @@ -817,10 +818,6 @@ async def test_screen_pipeline_stdio_clean( @pytest.mark.asyncio async def test_agent_pipeline_stdio_clean(self, _real_stdio_client, tmp_path): """Agent: submit → poll → results. Every response must be stdio-clean.""" - # Minimal input to keep cost low - input_csv = tmp_path / "input.csv" - pd.DataFrame([{"name": "Anthropic"}]).to_csv(input_csv, index=False) - async with _stdio_mcp_client(_real_stdio_client) as session: # ── Submit ── submit = await session.call_tool( @@ -828,7 +825,7 @@ async def test_agent_pipeline_stdio_clean(self, _real_stdio_client, tmp_path): { "params": { "task": "Find this company's headquarters city.", - "input_csv": str(input_csv), + "data": [{"name": "Anthropic"}], "response_schema": { "properties": { "headquarters": { From 365117c8d670cde0946c5742b21ff9e425e4b4b8 Mon Sep 17 00:00:00 2001 From: Rafael Poyiadzi Date: Tue, 24 Feb 2026 15:41:55 +0000 Subject: [PATCH 4/8] Fix Google Sheets /pub URL handling and empty CSV error message - /pub URLs now correctly convert to /export?format=csv - Headers-only CSV from URL raises clear "empty CSV" error instead of misleading "could not parse as CSV or JSON" Co-Authored-By: Claude Opus 4.6 --- everyrow-mcp/src/everyrow_mcp/utils.py | 11 +++++++---- everyrow-mcp/tests/test_utils.py | 13 +++++++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/everyrow-mcp/src/everyrow_mcp/utils.py b/everyrow-mcp/src/everyrow_mcp/utils.py index 0f70c013..7530ed76 100644 --- a/everyrow-mcp/src/everyrow_mcp/utils.py +++ b/everyrow-mcp/src/everyrow_mcp/utils.py @@ -37,7 +37,7 @@ def _normalise_google_sheets_url(url: str) -> str: Handles: - ``/edit...`` → ``/export?format=csv`` - - ``/pub?...`` → ``/pub?...&output=csv`` + - ``/pub...`` → ``/export?format=csv`` - Already has ``/export?format=csv`` → unchanged """ if "docs.google.com/spreadsheets" not in url: @@ -47,7 +47,7 @@ def _normalise_google_sheets_url(url: str) -> str: if "/export" in url and "format=csv" in url: return url - # /edit or /edit#gid=... → /export?format=csv + # /edit, /pub, or bare doc URL → /export?format=csv match = re.match(r"(https://docs\.google\.com/spreadsheets/d/[^/]+)", url) if match: base = match.group(1) @@ -80,8 +80,11 @@ async def fetch_csv_from_url(url: str) -> pd.DataFrame: # Try CSV first try: df = pd.read_csv(StringIO(response.text)) - if not df.empty: - return df + if df.empty: + raise ValueError(f"URL returned empty CSV data (headers only): {url}") + return df + except ValueError: + raise except Exception: pass diff --git a/everyrow-mcp/tests/test_utils.py b/everyrow-mcp/tests/test_utils.py index 97d54fc6..83439762 100644 --- a/everyrow-mcp/tests/test_utils.py +++ b/everyrow-mcp/tests/test_utils.py @@ -230,6 +230,19 @@ def test_already_export_url(self): result = _normalise_google_sheets_url(url) assert result == url + def test_pub_url_to_export(self): + url = "https://docs.google.com/spreadsheets/d/1abc/pub?output=html" + result = _normalise_google_sheets_url(url) + assert result == "https://docs.google.com/spreadsheets/d/1abc/export?format=csv" + + def test_pub_url_with_gid(self): + url = "https://docs.google.com/spreadsheets/d/1abc/pub?gid=456&single=true" + result = _normalise_google_sheets_url(url) + assert ( + result + == "https://docs.google.com/spreadsheets/d/1abc/export?format=csv&gid=456" + ) + def test_non_google_url_unchanged(self): url = "https://example.com/data.csv" result = _normalise_google_sheets_url(url) From 4574f06235299cf4c83e6f07f30ec00a5f659726 Mon Sep 17 00:00:00 2001 From: Rafael Poyiadzi Date: Tue, 24 Feb 2026 15:39:52 +0000 Subject: [PATCH 5/8] Require UPLOAD_SECRET env var for multi-pod HMAC signing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove auto-generated per-process secret — it breaks when pods don't share state. Now fails fast with a clear error if unset. Co-Authored-By: Claude Opus 4.6 --- everyrow-mcp/src/everyrow_mcp/uploads.py | 18 +++++++++++------- everyrow-mcp/tests/test_uploads.py | 17 +++++++++++++++-- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/everyrow-mcp/src/everyrow_mcp/uploads.py b/everyrow-mcp/src/everyrow_mcp/uploads.py index 09d049aa..61312790 100644 --- a/everyrow-mcp/src/everyrow_mcp/uploads.py +++ b/everyrow-mcp/src/everyrow_mcp/uploads.py @@ -11,7 +11,6 @@ import hmac import json import logging -import secrets import time from io import BytesIO from uuid import uuid4 @@ -49,14 +48,19 @@ class RequestUploadUrlInput(BaseModel): # ── HMAC signing ────────────────────────────────────────────── -_secret: list[str] = [] # mutable container to avoid global statement - def _get_secret() -> str: - """Return the HMAC secret, generating one if not configured.""" - if not _secret: - _secret.append(settings.upload_secret or secrets.token_urlsafe(32)) - return _secret[0] + """Return the HMAC secret from settings. + + Raises at call time if UPLOAD_SECRET is not configured — required + in multi-pod deployments so all instances share the same signing key. + """ + if not settings.upload_secret: + raise RuntimeError( + "UPLOAD_SECRET must be set in HTTP mode for HMAC signing. " + 'Generate one with: python -c "import secrets; print(secrets.token_urlsafe(32))"' + ) + return settings.upload_secret def sign_upload_url(upload_id: str, expires_at: int) -> str: diff --git a/everyrow-mcp/tests/test_uploads.py b/everyrow-mcp/tests/test_uploads.py index 29d67356..ca72a9b8 100644 --- a/everyrow-mcp/tests/test_uploads.py +++ b/everyrow-mcp/tests/test_uploads.py @@ -21,6 +21,11 @@ class TestHmacSigning: """Tests for HMAC signing and verification.""" + @pytest.fixture(autouse=True) + def _with_upload_secret(self): + with override_settings(upload_secret="test-secret-for-hmac"): + yield + def test_roundtrip(self): """A signature can be verified immediately.""" upload_id = "test-upload-id" @@ -48,6 +53,12 @@ def test_different_upload_id_rejected(self): sig = sign_upload_url("upload-1", expires_at) assert verify_upload_signature("upload-2", expires_at, sig) is False + def test_missing_secret_raises(self): + """RuntimeError when UPLOAD_SECRET is not set.""" + with override_settings(upload_secret=""): + with pytest.raises(RuntimeError, match="UPLOAD_SECRET must be set"): + sign_upload_url("test", int(time.time()) + 300) + class TestRequestUploadUrlInput: """Tests for the input model.""" @@ -95,7 +106,7 @@ async def test_returns_upload_url(self, fake_redis): # noqa: ARG002 ctx = make_test_context(mock_client) with ( - override_settings(transport="streamable-http"), + override_settings(transport="streamable-http", upload_secret="test-secret"), patch( "everyrow_mcp.uploads.redis_store.store_upload_meta", new_callable=AsyncMock, @@ -121,7 +132,9 @@ async def test_rejects_non_csv(self, fake_redis): # noqa: ARG002 mock_client = MagicMock(token="fake-token") ctx = make_test_context(mock_client) - with override_settings(transport="streamable-http"): + with override_settings( + transport="streamable-http", upload_secret="test-secret" + ): params = RequestUploadUrlInput(filename="data.json") result = await tool_fn(params, ctx) From 8c251f5bc59307e63bfc00316232c38f65c2d970 Mon Sep 17 00:00:00 2001 From: Rafael Poyiadzi Date: Tue, 24 Feb 2026 19:46:51 +0000 Subject: [PATCH 6/8] Reject empty inline data in validators Add len(v) == 0 check to _SingleSourceInput and MergeInput validate_data_size validators to prevent wasteful zero-row task submissions. Co-Authored-By: Claude Opus 4.6 --- everyrow-mcp/src/everyrow_mcp/models.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/everyrow-mcp/src/everyrow_mcp/models.py b/everyrow-mcp/src/everyrow_mcp/models.py index 1a22f8f2..4bdbccb6 100644 --- a/everyrow-mcp/src/everyrow_mcp/models.py +++ b/everyrow-mcp/src/everyrow_mcp/models.py @@ -165,10 +165,13 @@ def validate_artifact_id(cls, v: str | None) -> str | None: def validate_data_size( cls, v: list[dict[str, Any]] | None ) -> list[dict[str, Any]] | None: - if v is not None and len(v) > settings.max_inline_rows: - raise ValueError( - f"Inline data has {len(v)} rows (max {settings.max_inline_rows})" - ) + if v is not None: + if len(v) == 0: + raise ValueError("Inline data must not be empty.") + if len(v) > settings.max_inline_rows: + raise ValueError( + f"Inline data has {len(v)} rows (max {settings.max_inline_rows})" + ) return v @model_validator(mode="after") @@ -346,10 +349,13 @@ def validate_artifact_ids(cls, v: str | None) -> str | None: def validate_data_size( cls, v: list[dict[str, Any]] | None ) -> list[dict[str, Any]] | None: - if v is not None and len(v) > settings.max_inline_rows: - raise ValueError( - f"Inline data has {len(v)} rows (max {settings.max_inline_rows})" - ) + if v is not None: + if len(v) == 0: + raise ValueError("Inline data must not be empty.") + if len(v) > settings.max_inline_rows: + raise ValueError( + f"Inline data has {len(v)} rows (max {settings.max_inline_rows})" + ) return v @model_validator(mode="after") From 52d47698a509331c912cae28cb9ef19f4fb084e0 Mon Sep 17 00:00:00 2001 From: Rafael Poyiadzi Date: Tue, 24 Feb 2026 19:56:00 +0000 Subject: [PATCH 7/8] Fix upload_secret description, remove dead load_data, rename is_url MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix misleading upload_secret description: say "Required in HTTP mode" instead of "Auto-generated if empty" (it raises RuntimeError) - Remove unused load_data() and its 8 tests — superseded by model properties (_aid_or_dataframe) and upload_data tool - Rename _is_url → is_url since it's imported across modules Co-Authored-By: Claude Opus 4.6 --- everyrow-mcp/src/everyrow_mcp/config.py | 2 +- everyrow-mcp/src/everyrow_mcp/models.py | 4 +- everyrow-mcp/src/everyrow_mcp/tools.py | 4 +- everyrow-mcp/src/everyrow_mcp/utils.py | 58 +-------------------- everyrow-mcp/tests/test_utils.py | 68 +++---------------------- 5 files changed, 12 insertions(+), 124 deletions(-) diff --git a/everyrow-mcp/src/everyrow_mcp/config.py b/everyrow-mcp/src/everyrow_mcp/config.py index d2c25e4a..a6d7d810 100644 --- a/everyrow-mcp/src/everyrow_mcp/config.py +++ b/everyrow-mcp/src/everyrow_mcp/config.py @@ -79,7 +79,7 @@ class Settings(BaseSettings): # Upload settings (HTTP mode only) upload_secret: str = Field( default="", - description="HMAC-SHA256 secret for signing upload URLs. Auto-generated if empty.", + description="HMAC-SHA256 secret for signing upload URLs. Required in HTTP mode.", ) upload_url_ttl: int = Field( default=300, diff --git a/everyrow-mcp/src/everyrow_mcp/models.py b/everyrow-mcp/src/everyrow_mcp/models.py index 4bdbccb6..592d56a6 100644 --- a/everyrow-mcp/src/everyrow_mcp/models.py +++ b/everyrow-mcp/src/everyrow_mcp/models.py @@ -18,7 +18,7 @@ ) from everyrow_mcp.config import settings -from everyrow_mcp.utils import _is_url, validate_csv_path, validate_url +from everyrow_mcp.utils import is_url, validate_csv_path, validate_url JSON_TYPE_MAP = { "string": str, @@ -429,7 +429,7 @@ class UploadDataInput(BaseModel): @field_validator("source") @classmethod def validate_source(cls, v: str) -> str: - if _is_url(v): + if is_url(v): return validate_url(v) # Local path if settings.is_http: diff --git a/everyrow-mcp/src/everyrow_mcp/tools.py b/everyrow-mcp/src/everyrow_mcp/tools.py index f6caa974..49fffa6f 100644 --- a/everyrow-mcp/src/everyrow_mcp/tools.py +++ b/everyrow-mcp/src/everyrow_mcp/tools.py @@ -59,7 +59,7 @@ create_tool_response, write_initial_task_state, ) -from everyrow_mcp.utils import _is_url, fetch_csv_from_url, save_result_to_csv +from everyrow_mcp.utils import fetch_csv_from_url, is_url, save_result_to_csv logger = logging.getLogger(__name__) @@ -621,7 +621,7 @@ async def everyrow_upload_data( """ client = _get_client(ctx) - if _is_url(params.source): + if is_url(params.source): df = await fetch_csv_from_url(params.source) else: df = pd.read_csv(params.source) diff --git a/everyrow-mcp/src/everyrow_mcp/utils.py b/everyrow-mcp/src/everyrow_mcp/utils.py index 7530ed76..33a571bb 100644 --- a/everyrow-mcp/src/everyrow_mcp/utils.py +++ b/everyrow-mcp/src/everyrow_mcp/utils.py @@ -4,14 +4,13 @@ import re from io import StringIO from pathlib import Path -from typing import Any from urllib.parse import urlparse import httpx import pandas as pd -def _is_url(value: str) -> bool: +def is_url(value: str) -> bool: """Check if a string looks like an HTTP(S) URL.""" return value.startswith("http://") or value.startswith("https://") @@ -203,61 +202,6 @@ def resolve_output_path(output_path: str, input_path: str, prefix: str) -> Path: return out / f"{prefix}_{input_name}.csv" -def load_data( - *, - data: str | list[dict[str, Any]] | None = None, - input_csv: str | None = None, -) -> pd.DataFrame: - """Load tabular data from inline data or a local CSV file path. - - Exactly one of ``data`` or ``input_csv`` must be provided. - - Args: - data: Inline data — either a CSV string or a JSON array of objects - (``list[dict]``). When a string starting with ``[`` is passed it - is parsed as JSON first; otherwise it is treated as CSV. - input_csv: Absolute path to a CSV file on disk (stdio mode only). - - Returns: - DataFrame with the loaded data. - - Raises: - ValueError: If no source or multiple sources are provided, or if data is empty. - """ - sources = sum(1 for s in (data, input_csv) if s is not None) - if sources != 1: - raise ValueError("Provide exactly one of data, input_csv.") - - if input_csv: - return pd.read_csv(input_csv) - - # data is not None at this point - if isinstance(data, list): - df = pd.DataFrame(data) - if df.empty: - raise ValueError("data produced an empty DataFrame.") - return df - - # str — auto-detect JSON array vs CSV - assert isinstance(data, str) - stripped = data.strip() - if stripped.startswith("["): - try: - parsed = json.loads(stripped) - if isinstance(parsed, list): - df = pd.DataFrame(parsed) - if df.empty: - raise ValueError("data produced an empty DataFrame.") - return df - except json.JSONDecodeError: - pass # fall through to CSV - - df = pd.read_csv(StringIO(data)) - if df.empty: - raise ValueError("data produced an empty DataFrame.") - return df - - def save_result_to_csv(df: pd.DataFrame, path: Path) -> None: """Save a DataFrame to CSV. diff --git a/everyrow-mcp/tests/test_utils.py b/everyrow-mcp/tests/test_utils.py index 83439762..43556cf1 100644 --- a/everyrow-mcp/tests/test_utils.py +++ b/everyrow-mcp/tests/test_utils.py @@ -6,9 +6,8 @@ import pytest from everyrow_mcp.utils import ( - _is_url, _normalise_google_sheets_url, - load_data, + is_url, resolve_output_path, save_result_to_csv, validate_csv_path, @@ -101,61 +100,6 @@ def test_different_prefixes(self, tmp_path: Path): assert result == tmp_path / f"{prefix}_test.csv" -class TestLoadData: - """Tests for load_data (internal helper, still used by upload_data).""" - - def test_load_from_csv_file(self, tmp_path: Path): - """Test loading from a CSV file path.""" - csv_file = tmp_path / "test.csv" - csv_file.write_text("a,b\n1,2\n3,4\n") - df = load_data(input_csv=str(csv_file)) - assert list(df.columns) == ["a", "b"] - assert len(df) == 2 - - def test_load_from_csv_string(self): - """Test loading from an inline CSV string.""" - df = load_data(data="name,score\nAlice,10\nBob,20\n") - assert list(df.columns) == ["name", "score"] - assert len(df) == 2 - - def test_load_from_json_list(self): - """Test loading from a list of dicts.""" - records = [{"x": 1, "y": "a"}, {"x": 2, "y": "b"}] - df = load_data(data=records) - assert list(df.columns) == ["x", "y"] - assert len(df) == 2 - - def test_load_from_json_string(self): - """Test loading from a JSON array string (auto-detected).""" - df = load_data(data='[{"col": "val1"}, {"col": "val2"}]') - assert list(df.columns) == ["col"] - assert len(df) == 2 - - def test_rejects_no_source(self): - """Test that no source raises ValueError.""" - with pytest.raises(ValueError, match="Provide exactly one of"): - load_data() - - def test_rejects_both_sources(self, tmp_path: Path): - """Test that both sources raises ValueError.""" - csv_file = tmp_path / "test.csv" - csv_file.write_text("a\n1\n") - with pytest.raises(ValueError, match="Provide exactly one of"): - load_data(data="a\n1\n", input_csv=str(csv_file)) - - def test_empty_json_list_raises(self): - """Test that empty list raises ValueError.""" - with pytest.raises(ValueError, match="empty DataFrame"): - load_data(data=[]) - - def test_json_string_fallback_to_csv(self): - """A string starting with '[' that isn't valid JSON falls back to CSV.""" - # This is a CSV string that happens to start with [ - # It will fail JSON parse and fall through to CSV - df = load_data(data="[col]\nval1\nval2\n") - assert len(df) == 2 - - class TestSaveResultToCsv: """Tests for save_result_to_csv.""" @@ -174,19 +118,19 @@ def test_save_dataframe(self, tmp_path: Path): class TestIsUrl: - """Tests for _is_url.""" + """Tests for is_url.""" def test_http_url(self): - assert _is_url("http://example.com") is True + assert is_url("http://example.com") is True def test_https_url(self): - assert _is_url("https://example.com/data.csv") is True + assert is_url("https://example.com/data.csv") is True def test_local_path(self): - assert _is_url("/Users/test/data.csv") is False + assert is_url("/Users/test/data.csv") is False def test_relative_path(self): - assert _is_url("data.csv") is False + assert is_url("data.csv") is False class TestValidateUrl: From d086faada3555d7324a8c3abbeebb6726a36b743 Mon Sep 17 00:00:00 2001 From: Rafael Poyiadzi Date: Tue, 24 Feb 2026 20:01:40 +0000 Subject: [PATCH 8/8] Clarify submission labels when input is an artifact_id When a task is submitted via artifact_id (not inline data), the row count is unknown client-side. The fallback labels now say "artifact" instead of the vague "rows for X" to make clear that the server-side artifact is being processed. Co-Authored-By: Claude Opus 4.6 --- everyrow-mcp/src/everyrow_mcp/tools.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/everyrow-mcp/src/everyrow_mcp/tools.py b/everyrow-mcp/src/everyrow_mcp/tools.py index 49fffa6f..ecd6fa67 100644 --- a/everyrow-mcp/src/everyrow_mcp/tools.py +++ b/everyrow-mcp/src/everyrow_mcp/tools.py @@ -126,7 +126,7 @@ async def everyrow_agent(params: AgentInput, ctx: EveryRowContext) -> list[TextC session_url=session_url, label=f"Submitted: {total} agents starting." if total - else "Submitted: agents starting.", + else "Submitted: agents starting (artifact).", token=client.token, total=total, mcp_server_url=ctx.request_context.lifespan_context.mcp_server_url, @@ -275,7 +275,7 @@ async def everyrow_rank(params: RankInput, ctx: EveryRowContext) -> list[TextCon session_url=session_url, label=f"Submitted: {total} rows for ranking." if total - else "Submitted: rows for ranking.", + else "Submitted: artifact for ranking.", token=client.token, total=total, mcp_server_url=ctx.request_context.lifespan_context.mcp_server_url, @@ -356,7 +356,7 @@ async def everyrow_screen( session_url=session_url, label=f"Submitted: {total} rows for screening." if total - else "Submitted: rows for screening.", + else "Submitted: artifact for screening.", token=client.token, total=total, mcp_server_url=ctx.request_context.lifespan_context.mcp_server_url, @@ -428,7 +428,7 @@ async def everyrow_dedupe( session_url=session_url, label=f"Submitted: {total} rows for deduplication." if total - else "Submitted: rows for deduplication.", + else "Submitted: artifact for deduplication.", token=client.token, total=total, mcp_server_url=ctx.request_context.lifespan_context.mcp_server_url, @@ -515,7 +515,7 @@ async def everyrow_merge(params: MergeInput, ctx: EveryRowContext) -> list[TextC session_url=session_url, label=f"Submitted: {total} left rows for merging." if total - else "Submitted: rows for merging.", + else "Submitted: artifacts for merging.", token=client.token, total=total, mcp_server_url=ctx.request_context.lifespan_context.mcp_server_url, @@ -585,7 +585,7 @@ async def everyrow_forecast( session_url=session_url, label=f"Submitted: {total} rows for forecasting (6 research dimensions + dual forecaster per row)." if total - else "Submitted: rows for forecasting.", + else "Submitted: artifact for forecasting.", token=client.token, total=total, mcp_server_url=ctx.request_context.lifespan_context.mcp_server_url,