Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/setup/fastagent.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ default_model: gpt-5-mini.low
# mcp_ui_output_dir: ".fast-agent/ui" # Where to write MCP-UI HTML files (relative to CWD if not absolute)
# mcp_ui_mode: enabled

# MCP timeline display (adjust activity window/intervals in MCP UI + fast-agent check)
#mcp_timeline:
# steps: 20 # number of timeline buckets to render
# step_seconds: 30 # seconds per bucket (accepts values like "45s", "2m")

# Logging and Console Configuration:
logger:
# level: "debug" | "info" | "warning" | "error"
Expand Down
46 changes: 45 additions & 1 deletion src/fast_agent/cli/commands/check_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def get_fastagent_version() -> str:

def get_config_summary(config_path: Optional[Path]) -> dict:
"""Extract key information from the configuration file."""
from fast_agent.config import Settings
from fast_agent.config import MCPTimelineSettings, Settings

# Get actual defaults from Settings class
default_settings = Settings()
Expand All @@ -163,6 +163,10 @@ def get_config_summary(config_path: Optional[Path]) -> dict:
"enable_markup": default_settings.logger.enable_markup,
},
"mcp_ui_mode": default_settings.mcp_ui_mode,
"timeline": {
"steps": default_settings.mcp_timeline.steps,
"step_seconds": default_settings.mcp_timeline.step_seconds,
},
"mcp_servers": [],
}

Expand Down Expand Up @@ -211,6 +215,22 @@ def get_config_summary(config_path: Optional[Path]) -> dict:
if "mcp_ui_mode" in config:
result["mcp_ui_mode"] = config["mcp_ui_mode"]

# Get timeline settings
if "mcp_timeline" in config:
try:
timeline_override = MCPTimelineSettings(**(config.get("mcp_timeline") or {}))
except Exception as exc: # pragma: no cover - defensive
console.print(
"[yellow]Warning:[/yellow] Invalid mcp_timeline configuration; "
"using defaults."
)
console.print(f"[yellow]Details:[/yellow] {exc}")
else:
result["timeline"] = {
"steps": timeline_override.steps,
"step_seconds": timeline_override.step_seconds,
}

# Get MCP server info
if "mcp" in config and "servers" in config["mcp"]:
for server_name, server_config in config["mcp"]["servers"].items():
Expand Down Expand Up @@ -385,6 +405,28 @@ def bool_to_symbol(value):
else:
mcp_ui_display = f"[green]{mcp_ui_mode}[/green]"

timeline_settings = config_summary.get("timeline", {})
timeline_steps = timeline_settings.get("steps", 20)
timeline_step_seconds = timeline_settings.get("step_seconds", 30)

def format_step_interval(seconds: int) -> str:
try:
total = int(seconds)
except (TypeError, ValueError):
return str(seconds)
if total <= 0:
return "0s"
if total % 86400 == 0:
return f"{total // 86400}d"
if total % 3600 == 0:
return f"{total // 3600}h"
if total % 60 == 0:
return f"{total // 60}m"
minutes, secs = divmod(total, 60)
if minutes:
return f"{minutes}m{secs:02d}s"
return f"{secs}s"

# Prepare all settings as pairs
settings_data = [
("Log Level", logger.get("level", "warning (default)")),
Expand All @@ -395,6 +437,8 @@ def bool_to_symbol(value):
("Show Tools", bool_to_symbol(logger.get("show_tools", True))),
("Truncate Tools", bool_to_symbol(logger.get("truncate_tools", True))),
("Enable Markup", bool_to_symbol(logger.get("enable_markup", True))),
("Timeline Steps", f"[green]{timeline_steps}[/green]"),
("Timeline Interval", f"[green]{format_step_interval(timeline_step_seconds)}[/green]"),
]

# Add rows in two-column layout, styling some values in green
Expand Down
63 changes: 63 additions & 0 deletions src/fast_agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,66 @@ class MCPElicitationSettings(BaseModel):
model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True)


class MCPTimelineSettings(BaseModel):
"""Configuration for MCP activity timeline display."""

steps: int = 20
"""Number of timeline buckets to render."""

step_seconds: int = 30
"""Duration of each timeline bucket in seconds."""

model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True)

@staticmethod
def _parse_duration(value: str) -> int:
"""Parse simple duration strings like '30s', '2m', '1h' into seconds."""
pattern = re.compile(r"^\s*(\d+)\s*([smhd]?)\s*$", re.IGNORECASE)
match = pattern.match(value)
if not match:
raise ValueError("Expected duration in seconds (e.g. 30, '45s', '2m').")
amount = int(match.group(1))
unit = match.group(2).lower()
multiplier = {
"": 1,
"s": 1,
"m": 60,
"h": 3600,
"d": 86400,
}.get(unit)
if multiplier is None:
raise ValueError("Duration unit must be one of s, m, h, or d.")
return amount * multiplier

@field_validator("steps", mode="before")
@classmethod
def _coerce_steps(cls, value: Any) -> int:
if isinstance(value, str):
if not value.strip().isdigit():
raise ValueError("Timeline steps must be a positive integer.")
value = int(value.strip())
elif isinstance(value, float):
value = int(value)
if not isinstance(value, int):
raise TypeError("Timeline steps must be an integer.")
if value <= 0:
raise ValueError("Timeline steps must be greater than zero.")
return value

@field_validator("step_seconds", mode="before")
@classmethod
def _coerce_step_seconds(cls, value: Any) -> int:
if isinstance(value, str):
value = cls._parse_duration(value)
elif isinstance(value, (int, float)):
value = int(value)
else:
raise TypeError("Timeline step duration must be a number of seconds.")
if value <= 0:
raise ValueError("Timeline step duration must be greater than zero.")
return value


class MCPRootSettings(BaseModel):
"""Represents a root directory configuration for an MCP server."""

Expand Down Expand Up @@ -528,6 +588,9 @@ class Settings(BaseSettings):
mcp_ui_output_dir: str = ".fast-agent/ui"
"""Directory where MCP-UI HTML files are written. Relative paths are resolved from CWD."""

mcp_timeline: MCPTimelineSettings = MCPTimelineSettings()
"""Display settings for MCP activity timelines."""

@classmethod
def find_config(cls) -> Path | None:
"""Find the config file in the current directory or parent directories."""
Expand Down
2 changes: 1 addition & 1 deletion src/fast_agent/mcp/mcp_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def __aenter__(self):
server_registry = context.server_registry
if server_registry is None:
raise RuntimeError("Context is missing server registry for MCP connections")
manager = MCPConnectionManager(server_registry)
manager = MCPConnectionManager(server_registry, context=context)
await manager.__aenter__()
context._connection_manager = manager
self._persistent_connection_manager = cast(
Expand Down
18 changes: 17 additions & 1 deletion src/fast_agent/mcp/mcp_connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,24 @@ async def launch_server(

logger.debug(f"{server_name}: Found server configuration=", data=config.model_dump())

timeline_steps = 20
timeline_seconds = 30
try:
ctx = self.context
except RuntimeError:
ctx = None

config_obj = getattr(ctx, "config", None)
timeline_config = getattr(config_obj, "mcp_timeline", None)
if timeline_config:
timeline_steps = getattr(timeline_config, "steps", timeline_steps)
timeline_seconds = getattr(timeline_config, "step_seconds", timeline_seconds)

transport_metrics = (
TransportChannelMetrics()
TransportChannelMetrics(
bucket_seconds=timeline_seconds,
bucket_count=timeline_steps,
)
if config.transport in ("http", "sse", "stdio")
else None
)
Expand Down
40 changes: 37 additions & 3 deletions src/fast_agent/mcp/transport_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class ChannelSnapshot(BaseModel):
response_count: int = 0
notification_count: int = 0
activity_buckets: list[str] | None = None
activity_bucket_seconds: int | None = None
activity_bucket_count: int | None = None


class TransportSnapshot(BaseModel):
Expand All @@ -95,12 +97,18 @@ class TransportSnapshot(BaseModel):
get: ChannelSnapshot | None = None
resumption: ChannelSnapshot | None = None
stdio: ChannelSnapshot | None = None
activity_bucket_seconds: int | None = None
activity_bucket_count: int | None = None


class TransportChannelMetrics:
"""Aggregates low-level channel events into user-visible metrics."""

def __init__(self) -> None:
def __init__(
self,
bucket_seconds: int | None = None,
bucket_count: int | None = None,
) -> None:
self._lock = Lock()

self._post_modes: set[str] = set()
Expand Down Expand Up @@ -155,8 +163,22 @@ def __init__(self) -> None:

self._response_channel_by_id: dict[RequestId, ChannelName] = {}

self._history_bucket_seconds = 30
self._history_bucket_count = 20
try:
seconds = 30 if bucket_seconds is None else int(bucket_seconds)
except (TypeError, ValueError):
seconds = 30
if seconds <= 0:
seconds = 30

try:
count = 20 if bucket_count is None else int(bucket_count)
except (TypeError, ValueError):
count = 20
if count <= 0:
count = 20

self._history_bucket_seconds = seconds
self._history_bucket_count = count
self._history_priority = {
"error": 5,
"disabled": 4,
Expand Down Expand Up @@ -463,6 +485,8 @@ def _build_post_mode_snapshot(self, mode: str, now: datetime) -> ChannelSnapshot
last_message_summary=stats.last_summary,
last_message_at=stats.last_at,
activity_buckets=self._build_activity_buckets(f"post-{mode}", now),
activity_bucket_seconds=self._history_bucket_seconds,
activity_bucket_count=self._history_bucket_count,
)

def snapshot(self) -> TransportSnapshot:
Expand Down Expand Up @@ -503,6 +527,8 @@ def snapshot(self) -> TransportSnapshot:
response_count=self._post_response_count,
notification_count=self._post_notification_count,
activity_buckets=self._merge_activity_buckets(["post-json", "post-sse"], now),
activity_bucket_seconds=self._history_bucket_seconds,
activity_bucket_count=self._history_bucket_count,
)

post_json_snapshot = self._build_post_mode_snapshot("json", now)
Expand Down Expand Up @@ -543,6 +569,8 @@ def snapshot(self) -> TransportSnapshot:
response_count=self._get_response_count,
notification_count=self._get_notification_count,
activity_buckets=self._build_activity_buckets("get", now),
activity_bucket_seconds=self._history_bucket_seconds,
activity_bucket_count=self._history_bucket_count,
)

resumption_snapshot = None
Expand All @@ -555,6 +583,8 @@ def snapshot(self) -> TransportSnapshot:
response_count=self._resumption_response_count,
notification_count=self._resumption_notification_count,
activity_buckets=self._build_activity_buckets("resumption", now),
activity_bucket_seconds=self._history_bucket_seconds,
activity_bucket_count=self._history_bucket_count,
)

stdio_snapshot = None
Expand Down Expand Up @@ -588,6 +618,8 @@ def snapshot(self) -> TransportSnapshot:
response_count=self._stdio_response_count,
notification_count=self._stdio_notification_count,
activity_buckets=self._build_activity_buckets("stdio", now),
activity_bucket_seconds=self._history_bucket_seconds,
activity_bucket_count=self._history_bucket_count,
)

return TransportSnapshot(
Expand All @@ -597,4 +629,6 @@ def snapshot(self) -> TransportSnapshot:
get=get_snapshot,
resumption=resumption_snapshot,
stdio=stdio_snapshot,
activity_bucket_seconds=self._history_bucket_seconds,
activity_bucket_count=self._history_bucket_count,
)
Loading
Loading