Skip to content
4 changes: 4 additions & 0 deletions everyrow-mcp/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@
{
"name": "everyrow_cancel",
"description": "Cancel a running everyrow task. Use when the user wants to stop a task that is currently processing."
},
{
"name": "everyrow_upload_data",
"description": "Upload data from a URL or local file. Returns an artifact_id for use in processing tools."
}
],
"user_config": {
Expand Down
27 changes: 19 additions & 8 deletions everyrow-mcp/src/everyrow_mcp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@ class Settings(BaseSettings):

everyrow_api_url: str = Field(default="https://everyrow.io/api/v0")
preview_size: int = Field(default=1000)
max_inline_rows: int = Field(
default=50_000,
description="Maximum number of rows allowed in inline JSON data",
)
max_inline_data_bytes: int = Field(
default=10 * 1024 * 1024,
description="Maximum size in bytes for inline CSV string data (10 MB)",
)
max_schema_properties: int = Field(
default=50,
description="Maximum number of properties allowed in a response schema",
Expand Down Expand Up @@ -79,6 +71,25 @@ class Settings(BaseSettings):
default=604_800,
description="Refresh token TTL in seconds (7 days)",
)
max_inline_rows: int = Field(
default=5000,
description="Maximum rows allowed in inline data (list[dict]).",
)

# Upload settings (HTTP mode only)
upload_secret: str = Field(
default="",
description="HMAC-SHA256 secret for signing upload URLs. Required in HTTP mode.",
)
upload_url_ttl: int = Field(
default=300,
description="Presigned upload URL validity in seconds (5 min).",
)
max_upload_size_bytes: int = Field(
default=50 * 1024 * 1024,
description="Maximum upload file size in bytes (50 MB).",
)

everyrow_api_key: str | None = None

@property
Expand Down
2 changes: 2 additions & 0 deletions everyrow-mcp/src/everyrow_mcp/http_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from everyrow_mcp.redis_store import get_redis_client
from everyrow_mcp.routes import api_download, api_progress
from everyrow_mcp.templates import RESULTS_HTML, SESSION_HTML
from everyrow_mcp.uploads import handle_upload

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -101,6 +102,7 @@ def _register_routes(
mcp.custom_route("/api/results/{task_id}/download", ["GET", "OPTIONS"])(
api_download
)
mcp.custom_route("/api/uploads/{upload_id}", ["PUT"])(handle_upload)

async def _health(_request: Request) -> Response:
try:
Expand Down
167 changes: 130 additions & 37 deletions everyrow-mcp/src/everyrow_mcp/models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Input models and schema helpers for everyrow MCP tools."""

from enum import StrEnum
from pathlib import Path
from typing import Any, Literal
from uuid import UUID

import pandas as pd
from jsonschema import SchemaError
from jsonschema.validators import validator_for
from pydantic import (
Expand All @@ -16,7 +18,7 @@
)

from everyrow_mcp.config import settings
from everyrow_mcp.utils import validate_csv_path
from everyrow_mcp.utils import is_url, validate_csv_path, validate_url

JSON_TYPE_MAP = {
"string": str,
Expand All @@ -28,6 +30,11 @@
}


class InputDataMode(StrEnum):
dataframe = "DATAFRAME"
artifact_id = "ARTIFACT_ID"


def _validate_response_schema(schema: dict[str, Any] | None) -> dict[str, Any] | None:
"""Validate response_schema is a JSON Schema object schema."""
if schema is None:
Expand Down Expand Up @@ -134,48 +141,62 @@ def _check_exactly_one(
class _SingleSourceInput(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")

input_csv: str | None = Field(
artifact_id: str | None = Field(
default=None,
description="Absolute path to CSV file (local/stdio mode only).",
description="Artifact ID (UUID) from upload_data or request_upload_url.",
)
data: str | list[dict[str, Any]] | None = Field(
data: list[dict[str, Any]] | None = Field(
default=None,
description="Inline data — CSV string or JSON array of objects.",
description="Inline data as a list of row objects.",
)

@field_validator("input_csv")
@field_validator("artifact_id")
@classmethod
def validate_input_csv(cls, v: str | None) -> str | None:
def validate_artifact_id(cls, v: str | None) -> str | None:
if v is not None:
validate_csv_path(v)
try:
UUID(v)
except ValueError as exc:
raise ValueError(f"artifact_id must be a valid UUID: {v}") from exc
return v

@field_validator("data")
@classmethod
def validate_data_size(
cls, v: str | list[dict[str, Any]] | None
) -> str | list[dict[str, Any]] | None:
if v is None:
return v
if isinstance(v, str) and len(v) > settings.max_inline_data_bytes:
raise ValueError(
f"Inline data exceeds {settings.max_inline_data_bytes // (1024 * 1024)} MB limit"
)
if isinstance(v, list) and len(v) > settings.max_inline_rows:
raise ValueError(
f"Inline data has {len(v)} rows (max {settings.max_inline_rows})"
)
cls, v: list[dict[str, Any]] | None
) -> list[dict[str, Any]] | None:
if v is not None:
if len(v) == 0:
raise ValueError("Inline data must not be empty.")
if len(v) > settings.max_inline_rows:
raise ValueError(
f"Inline data has {len(v)} rows (max {settings.max_inline_rows})"
)
return v

@model_validator(mode="after")
def check_input_source(self):
_check_exactly_one(
values=(self.input_csv, self.data),
field_names=("input_csv", "data"),
values=(self.artifact_id, self.data),
field_names=("artifact_id", "data"),
label="Input",
)
return self

@property
def _input_data_mode(self) -> InputDataMode:
return (
InputDataMode.artifact_id
if self.artifact_id is not None
else InputDataMode.dataframe
)

@property
def _aid_or_dataframe(self) -> UUID | pd.DataFrame:
if self.artifact_id is not None:
return UUID(self.artifact_id)
return pd.DataFrame(self.data)


class AgentInput(_SingleSourceInput):
"""Input for the agent operation."""
Expand Down Expand Up @@ -276,23 +297,23 @@ class MergeInput(BaseModel):
)

# LEFT table
left_csv: str | None = Field(
left_artifact_id: str | None = Field(
default=None,
description="Absolute path to the left CSV (local/stdio mode only).",
description="Artifact ID (UUID) for the left table, from upload_data or request_upload_url.",
)
left_data: str | list[dict[str, Any]] | None = Field(
left_data: list[dict[str, Any]] | None = Field(
default=None,
description="Inline data for the left table — CSV string or JSON array of objects.",
description="Inline data for the left table as a list of row objects.",
)

# RIGHT table
right_csv: str | None = Field(
right_artifact_id: str | None = Field(
default=None,
description="Absolute path to the right CSV (local/stdio mode only).",
description="Artifact ID (UUID) for the right table, from upload_data or request_upload_url.",
)
right_data: str | list[dict[str, Any]] | None = Field(
right_data: list[dict[str, Any]] | None = Field(
default=None,
description="Inline data for the right table — CSV string or JSON array of objects.",
description="Inline data for the right table as a list of row objects.",
)

merge_on_left: str | None = Field(
Expand All @@ -313,27 +334,72 @@ class MergeInput(BaseModel):
description="Relationship type: many_to_one (default) or one_to_one.",
)

@field_validator("left_csv", "right_csv")
@field_validator("left_artifact_id", "right_artifact_id")
@classmethod
def validate_artifact_ids(cls, v: str | None) -> str | None:
if v is not None:
try:
UUID(v)
except ValueError as exc:
raise ValueError(f"artifact_id must be a valid UUID: {v}") from exc
return v

@field_validator("left_data", "right_data")
@classmethod
def validate_csv_paths(cls, v: str | None) -> str | None:
def validate_data_size(
cls, v: list[dict[str, Any]] | None
) -> list[dict[str, Any]] | None:
if v is not None:
validate_csv_path(v)
if len(v) == 0:
raise ValueError("Inline data must not be empty.")
if len(v) > settings.max_inline_rows:
raise ValueError(
f"Inline data has {len(v)} rows (max {settings.max_inline_rows})"
)
return v

@model_validator(mode="after")
def check_sources(self) -> "MergeInput":
_check_exactly_one(
values=(self.left_csv, self.left_data),
field_names=("left_csv", "left_data"),
values=(self.left_artifact_id, self.left_data),
field_names=("left_artifact_id", "left_data"),
label="Left table",
)
_check_exactly_one(
values=(self.right_csv, self.right_data),
field_names=("right_csv", "right_data"),
values=(self.right_artifact_id, self.right_data),
field_names=("right_artifact_id", "right_data"),
label="Right table",
)
return self

@property
def _left_input_data_mode(self) -> InputDataMode:
return (
InputDataMode.artifact_id
if self.left_artifact_id is not None
else InputDataMode.dataframe
)

@property
def _left_aid_or_dataframe(self) -> UUID | pd.DataFrame:
if self.left_artifact_id is not None:
return UUID(self.left_artifact_id)
return pd.DataFrame(self.left_data)

@property
def _right_input_data_mode(self) -> InputDataMode:
return (
InputDataMode.artifact_id
if self.right_artifact_id is not None
else InputDataMode.dataframe
)

@property
def _right_aid_or_dataframe(self) -> UUID | pd.DataFrame:
if self.right_artifact_id is not None:
return UUID(self.right_artifact_id)
return pd.DataFrame(self.right_data)


class ForecastInput(_SingleSourceInput):
"""Input for the forecast operation."""
Expand All @@ -348,6 +414,33 @@ class ForecastInput(_SingleSourceInput):
)


class UploadDataInput(BaseModel):
"""Input for the upload_data tool."""

model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")

source: str = Field(
...,
description="Data source: http(s) URL (Google Sheets/Drive supported) "
"or absolute local file path (stdio mode only).",
min_length=1,
)

@field_validator("source")
@classmethod
def validate_source(cls, v: str) -> str:
if is_url(v):
return validate_url(v)
# Local path
if settings.is_http:
raise ValueError(
"Local file paths are not supported in HTTP mode. "
"Use a URL or request_upload_url instead."
)
validate_csv_path(v)
return v


class SingleAgentInput(BaseModel):
"""Input for a single agent operation (no CSV)."""

Expand Down
14 changes: 14 additions & 0 deletions everyrow-mcp/src/everyrow_mcp/redis_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,17 @@ async def store_poll_token(task_id: str, poll_token: str) -> None:

async def get_poll_token(task_id: str) -> str | None:
return await get_redis_client().get(build_key("poll_token", task_id))


# ── Upload metadata ───────────────────────────────────────────


async def store_upload_meta(upload_id: str, meta_json: str, ttl: int) -> None:
"""Store upload metadata with TTL (consume-on-use)."""
await get_redis_client().setex(build_key("upload", upload_id), ttl, meta_json)


async def pop_upload_meta(upload_id: str) -> str | None:
"""Atomically get and delete upload metadata (prevents replay)."""
key = build_key("upload", upload_id)
return await get_redis_client().getdel(key)
3 changes: 3 additions & 0 deletions everyrow-mcp/src/everyrow_mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
_RESULTS_META,
everyrow_results_http,
)
from everyrow_mcp.uploads import register_upload_tool


class InputArgs(BaseModel):
Expand Down Expand Up @@ -87,6 +88,8 @@ def main():
)(everyrow_results_http)

if input_args.http:
register_upload_tool(mcp)

if input_args.no_auth:
mcp_server_url = f"http://localhost:{input_args.port}"
else:
Expand Down
8 changes: 8 additions & 0 deletions everyrow-mcp/src/everyrow_mcp/tool_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,16 @@ def write_initial_task_state(
task_type: PublicTaskType,
session_url: str,
total: int,
input_source: str = "unknown",
) -> None:
"""Write initial task state file when a task is first submitted."""
logger.info(
"Task %s (%s): input_source=%s, total=%d",
task_id,
task_type.value,
input_source,
total,
)
if settings.is_http:
return
_write_task_state_file(
Expand Down
Loading