Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ jobs:
steps:
- uses: actions/checkout@v4

- name: Install redis-server
run: sudo apt-get update && sudo apt-get install -y redis-server

- name: Install uv
uses: astral-sh/setup-uv@v5
with:
Expand Down
1 change: 1 addition & 0 deletions everyrow-mcp/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies = [
"pandas>=2.0.0",
"pydantic>=2.0.0,<3.0.0",
"pydantic-settings>=2.0.0",
"redis>=5.0.0",
]

[project.scripts]
Expand Down
71 changes: 71 additions & 0 deletions everyrow-mcp/src/everyrow_mcp/redis_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Redis client factory and key helpers for the MCP server."""

from __future__ import annotations

import logging

from redis.asyncio import Redis, Sentinel
from redis.asyncio.retry import Retry
from redis.backoff import ExponentialBackoff

logger = logging.getLogger(__name__)

REDIS_DB = 13
HEALTH_CHECK_INTERVAL = 30


def build_key(*parts: str) -> str:
# Sanitize parts to prevent key-injection via embedded colons
sanitized = [p.replace(":", "_") for p in parts]
return "mcp:" + ":".join(sanitized)


def create_redis_client(
*,
host: str = "localhost",
port: int = 6379,
db: int = REDIS_DB,
password: str | None = None,
sentinel_endpoints: str | None = None,
sentinel_master_name: str | None = None,
) -> Redis:
"""Create an async Redis client with retry and health-check support.

If *sentinel_endpoints* is provided (comma-separated "host:port" pairs),
connects via Sentinel; otherwise connects directly.
"""
retry = Retry(ExponentialBackoff(), retries=3)

if sentinel_endpoints and sentinel_master_name:
sentinels = []
for ep in sentinel_endpoints.split(","):
h, p = ep.strip().rsplit(":", 1)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The create_redis_client function is defined but never called in the production code, leaving the Redis integration feature incomplete and non-operational.
Severity: HIGH

Suggested Fix

Integrate the create_redis_client function into the application's initialization logic. This likely involves calling it when the ServerState is created and using its return value to populate the store field if Redis is configured.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: everyrow-mcp/src/everyrow_mcp/redis_utils.py#L42

Potential issue: The function `create_redis_client` in `redis_utils.py` is defined to
establish a connection to a Redis instance, including support for Redis Sentinel.
However, this function is never called anywhere in the production application code. Its
only usage is within the test suite. As a result, even if Redis Sentinel endpoints are
configured, the application will not attempt to connect to Redis, rendering the feature
incomplete and non-functional.

Did we get this right? 👍 / 👎 to inform future reviews.

sentinels.append((h, int(p)))

sentinel = Sentinel(
sentinels,
sentinel_kwargs={"password": password} if password else {},
retry=retry,
)
client: Redis = sentinel.master_for(
sentinel_master_name,
db=db,
password=password,
decode_responses=True,
health_check_interval=HEALTH_CHECK_INTERVAL,
retry=retry,
)
logger.info("Redis: Sentinel mode, master=%s, db=%d", sentinel_master_name, db)
return client

client = Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True,
health_check_interval=HEALTH_CHECK_INTERVAL,
retry=retry,
)
logger.info("Redis: direct mode, host=%s:%d, db=%d", host, port, db)
return client
194 changes: 194 additions & 0 deletions everyrow-mcp/src/everyrow_mcp/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""Centralized server state for the everyrow MCP server.

RedisStore encapsulates all Redis data operations with error handling
and TTL management. ServerState is a thin config/context holder.
"""

from __future__ import annotations

import logging
from enum import StrEnum
from pathlib import Path
from typing import Any

from everyrow.generated.client import AuthenticatedClient
from pydantic import BaseModel, ConfigDict, Field
from redis.asyncio import Redis

from everyrow_mcp.config import DevHttpSettings, HttpSettings, StdioSettings
from everyrow_mcp.redis_utils import build_key

logger = logging.getLogger(__name__)

PROGRESS_POLL_DELAY = 12
TASK_STATE_FILE = Path.home() / ".everyrow" / "task.json"
RESULT_CACHE_TTL = 600
CSV_CACHE_TTL = 3600 # 1 hour — full CSV stored in Redis for download
TOKEN_TTL = 86400 # 24 hours — must outlive the longest possible task


class Transport(StrEnum):
STDIO = "stdio"
HTTP = "http"


class RedisStore:
"""Redis-backed storage for task state, results, and tokens.

Encapsulates all Redis data operations with error handling and TTL
management. The Redis client is required at construction time;
null-safety is handled at the ServerState level (``store`` is
``RedisStore | None``).
"""

def __init__(self, redis: Redis) -> None:
self._redis = redis

async def ping(self) -> None:
"""Ping Redis to verify connectivity."""
await self._redis.ping()

# ── Result metadata ───────────────────────────────────────────

async def get_result_meta(self, task_id: str) -> str | None:
"""Get cached result metadata from Redis."""
try:
return await self._redis.get(build_key("result", task_id))
except Exception:
logger.warning("Failed to get result metadata from Redis for %s", task_id)
return None

async def store_result_meta(self, task_id: str, meta_json: str) -> None:
"""Store result metadata in Redis with TTL."""
try:
await self._redis.setex(
build_key("result", task_id),
RESULT_CACHE_TTL,
meta_json,
)
except Exception:
logger.warning("Failed to store result metadata in Redis for %s", task_id)

# ── Result pages ──────────────────────────────────────────────

async def get_result_page(
self, task_id: str, offset: int, page_size: int
) -> str | None:
"""Get a cached page preview from Redis."""
try:
return await self._redis.get(
build_key("result", task_id, "page", str(offset), str(page_size))
)
except Exception:
logger.warning("Failed to get result page from Redis for %s", task_id)
return None

async def store_result_page(
self, task_id: str, offset: int, page_size: int, preview_json: str
) -> None:
"""Cache a page preview in Redis with TTL."""
try:
await self._redis.setex(
build_key("result", task_id, "page", str(offset), str(page_size)),
RESULT_CACHE_TTL,
preview_json,
)
except Exception:
logger.warning("Failed to store result page in Redis for %s", task_id)

# ── CSV result storage ────────────────────────────────────────

async def store_result_csv(self, task_id: str, csv_text: str) -> None:
"""Store full CSV text in Redis with 1h TTL."""
try:
await self._redis.setex(
build_key("result", task_id, "csv"),
CSV_CACHE_TTL,
csv_text,
)
except Exception:
logger.warning("Failed to store result CSV in Redis for %s", task_id)

async def get_result_csv(self, task_id: str) -> str | None:
"""Read full CSV text from Redis."""
try:
return await self._redis.get(build_key("result", task_id, "csv"))
except Exception:
logger.warning("Failed to get result CSV from Redis for %s", task_id)
return None

# ── Task tokens ───────────────────────────────────────────────

async def store_task_token(self, task_id: str, token: str) -> None:
"""Store an API token for a task in Redis."""
try:
await self._redis.setex(build_key("task_token", task_id), TOKEN_TTL, token)
except Exception:
logger.warning("Failed to store task token in Redis for %s", task_id)

async def get_task_token(self, task_id: str) -> str | None:
"""Get an API token for a task from Redis."""
try:
return await self._redis.get(build_key("task_token", task_id))
except Exception:
logger.warning("Failed to get task token from Redis for %s", task_id)
return None

# ── Poll tokens ───────────────────────────────────────────────

async def store_poll_token(self, task_id: str, poll_token: str) -> None:
"""Store a poll token for a task in Redis."""
try:
await self._redis.setex(
build_key("poll_token", task_id), TOKEN_TTL, poll_token
)
except Exception:
logger.warning("Failed to store poll token in Redis for %s", task_id)

async def get_poll_token(self, task_id: str) -> str | None:
"""Get a poll token for a task from Redis."""
try:
return await self._redis.get(build_key("poll_token", task_id))
except Exception:
logger.warning("Failed to get poll token from Redis for %s", task_id)
return None

async def pop_task_token(self, task_id: str) -> None:
"""Remove the API task token from Redis.

The poll token is intentionally kept — it's needed to authenticate
CSV download requests after the task completes (it expires naturally
via its 24h TTL).
"""
try:
await self._redis.delete(build_key("task_token", task_id))
except Exception:
logger.warning("Failed to delete task token from Redis for %s", task_id)


class ServerState(BaseModel):
"""Mutable state shared across the MCP server.

Thin config/context holder — all Redis data operations are delegated
to RedisStore.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

client: AuthenticatedClient | None = None
transport: Transport = Transport.STDIO
mcp_server_url: str = ""
settings: StdioSettings | HttpSettings | DevHttpSettings | None = None
store: RedisStore | None = Field(default=None)
auth_provider: Any | None = Field(default=None)

@property
def is_stdio(self) -> bool:
return self.transport == Transport.STDIO

@property
def is_http(self) -> bool:
return self.transport != Transport.STDIO


state = ServerState()
52 changes: 52 additions & 0 deletions everyrow-mcp/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,65 @@
"""Shared pytest fixtures for everyrow MCP server tests."""

from __future__ import annotations

import socket
import subprocess
import time
from pathlib import Path

import pandas as pd
import pytest
import redis.asyncio as aioredis
from everyrow.api_utils import create_client

from everyrow_mcp import app

_REDIS_PORT = 16379 # non-default port to avoid clashing with local Redis


@pytest.fixture(scope="session")
def _redis_server():
"""Start a local redis-server process for the test session."""
proc = subprocess.Popen(
[
"redis-server",
"--port",
str(_REDIS_PORT),
"--save",
"",
"--appendonly",
"no",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# Wait for Redis to accept connections
for _ in range(30):
try:
s = socket.create_connection(("localhost", _REDIS_PORT), timeout=0.1)
s.close()
break
except OSError:
time.sleep(0.1)
else:
proc.kill()
raise RuntimeError("Test redis-server did not start in time")

yield

proc.terminate()
proc.wait(timeout=5)


@pytest.fixture
async def fake_redis(_redis_server) -> aioredis.Redis:
"""A real Redis client, flushed after each test."""
r = aioredis.Redis(host="localhost", port=_REDIS_PORT, decode_responses=True)
await r.flushdb()
yield r
await r.flushdb()
await r.aclose()


@pytest.fixture
async def everyrow_client():
Expand Down
Loading