Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/servers/fmsr/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,22 @@ def _build_llm():

# ── LLM call helpers with retry ───────────────────────────────────────────────

_asset2fm_cache: dict[str, list[str]] = {}


def _call_asset2fm(asset_name: str) -> list[str]:
"""Query the LLM for failure modes of an asset. Retries up to _MAX_RETRIES times."""
"""Query the LLM for failure modes of an asset. Retries up to _MAX_RETRIES times.
Results are cached to avoid redundant LLM calls for the same asset."""
if asset_name in _asset2fm_cache:
return _asset2fm_cache[asset_name]

prompt = _ASSET2FM_PROMPT.format(asset_name=asset_name)
last_exc: Exception | None = None
for _ in range(_MAX_RETRIES):
try:
return _parse_numbered_list(_llm.generate(prompt))
result = _parse_numbered_list(_llm.generate(prompt))
_asset2fm_cache[asset_name] = result
return result
except Exception as exc:
last_exc = exc
raise last_exc
Expand Down Expand Up @@ -179,10 +188,10 @@ class FailureModeSensorMappingResult(BaseModel):

# ── FastMCP server ────────────────────────────────────────────────────────────

mcp = FastMCP("fmsr")
mcp = FastMCP("fmsr", instructions="Failure mode and sensor reasoning: get failure modes for assets and determine which sensors can detect each failure.")


@mcp.tool()
@mcp.tool(title="Get Failure Modes")
def get_failure_modes(asset_name: str) -> Union[FailureModesResult, ErrorResult]:
"""Returns a list of known failure modes for the given asset.
For chillers and AHUs returns a curated list. For other assets queries the LLM."""
Expand All @@ -207,7 +216,7 @@ def get_failure_modes(asset_name: str) -> Union[FailureModesResult, ErrorResult]
return ErrorResult(error=str(exc))


@mcp.tool()
@mcp.tool(title="Get Failure Mode Sensor Mapping")
def get_failure_mode_sensor_mapping(
asset_name: str,
failure_modes: List[str],
Expand Down
41 changes: 28 additions & 13 deletions src/servers/iot/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import logging
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, List, Optional, Union
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel
Expand Down Expand Up @@ -36,7 +37,7 @@
logger.error(f"Failed to connect to CouchDB: {e}")
db = None

mcp = FastMCP("iot")
mcp = FastMCP("iot", instructions="IoT sensor data: browse sites, assets, sensors, and query historical readings from CouchDB.")

# Static site as per original requirement
SITES = ["MAIN"]
Expand Down Expand Up @@ -75,28 +76,41 @@ class HistoryResult(BaseModel):
message: str


_asset_list_cache: Optional[List[str]] = None


def get_asset_list() -> List[str]:
"""Helper to fetch unique asset IDs from CouchDB."""
"""Helper to fetch unique asset IDs from CouchDB. Result is cached after
the first successful call to avoid repeated full-table scans."""
global _asset_list_cache
if _asset_list_cache is not None:
return _asset_list_cache

if not db:
return []

# Using a mango query to find unique asset_ids might be slow without an index,
# but for this benchmark we'll query documents and unique them.
# In a production environment, we'd use a CouchDB view.
try:
# We limit the fields to just asset_id to minimize data transfer
res = db.find(
{"asset_id": {"$exists": True}}, fields=["asset_id"], limit=100000
)
assets = {doc["asset_id"] for doc in res["docs"] if "asset_id" in doc}
return sorted(list(assets))
_asset_list_cache = sorted(list(assets))
return _asset_list_cache
except Exception as e:
logger.error(f"Error fetching assets: {e}")
return []


_sensor_list_cache: Dict[str, List[str]] = {}


def get_sensor_list(asset_id: str) -> List[str]:
"""Helper to fetch sensor names for a given asset from CouchDB."""
"""Helper to fetch sensor names for a given asset from CouchDB.
Result is cached per asset_id after the first successful call."""
if asset_id in _sensor_list_cache:
return _sensor_list_cache[asset_id]

if not db:
return []

Expand All @@ -109,20 +123,21 @@ def get_sensor_list(asset_id: str) -> List[str]:
doc = res["docs"][0]
# Exclude metadata and standard fields
exclude = {"_id", "_rev", "asset_id", "timestamp"}
sensors = [key for key in doc.keys() if key not in exclude]
return sorted(sensors)
sensors = sorted(key for key in doc.keys() if key not in exclude)
_sensor_list_cache[asset_id] = sensors
return sensors
except Exception as e:
logger.error(f"Error fetching sensors for {asset_id}: {e}")
return []


@mcp.tool()
@mcp.tool(title="List Sites")
def sites() -> SitesResult:
"""Retrieves a list of sites. Each site is represented by a name."""
return SitesResult(sites=SITES)


@mcp.tool()
@mcp.tool(title="List Assets")
def assets(site_name: str) -> Union[AssetsResult, ErrorResult]:
"""Returns a list of assets for a given site. Each asset includes an id and a name."""
if site_name not in SITES:
Expand All @@ -137,7 +152,7 @@ def assets(site_name: str) -> Union[AssetsResult, ErrorResult]:
)


@mcp.tool()
@mcp.tool(title="List Sensors")
def sensors(site_name: str, asset_id: str) -> Union[SensorsResult, ErrorResult]:
"""Lists the sensors available for a specified asset at a given site."""
if site_name not in SITES:
Expand All @@ -156,7 +171,7 @@ def sensors(site_name: str, asset_id: str) -> Union[SensorsResult, ErrorResult]:
)


@mcp.tool()
@mcp.tool(title="Get Sensor History")
def history(
site_name: str, asset_id: str, start: str, final: Optional[str] = None
) -> Union[HistoryResult, ErrorResult]:
Expand Down
48 changes: 32 additions & 16 deletions src/servers/tsfm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import os
import tempfile
import uuid
from typing import List, Optional, Union
from functools import lru_cache
from typing import Dict, List, Optional, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -75,6 +76,13 @@
# ── Internal helpers ──────────────────────────────────────────────────────────


@lru_cache(maxsize=16)
def _load_model_config(model_checkpoint: str) -> dict:
"""Load and cache model config.json to avoid repeated disk reads."""
with open(model_checkpoint + "/config.json") as f:
return json.load(f)


def _build_dataset_config(
timestamp_column: str,
target_columns: List[str],
Expand Down Expand Up @@ -107,13 +115,13 @@ def _tsad_output_to_df(output: dict) -> pd.DataFrame:

# ── FastMCP server ────────────────────────────────────────────────────────────

mcp = FastMCP("tsfm")
mcp = FastMCP("tsfm", instructions="Time-series foundation models: forecasting, finetuning, and anomaly detection using IBM Granite TinyTimeMixer.")


# ── Static tools ──────────────────────────────────────────────────────────────


@mcp.tool()
@mcp.tool(title="Get AI Tasks")
def get_ai_tasks() -> AITasksResult:
"""Returns the list of supported AI task types for time-series analysis.

Expand All @@ -123,7 +131,7 @@ def get_ai_tasks() -> AITasksResult:
return AITasksResult(tasks=[AITaskEntry(**t) for t in _AI_TASKS])


@mcp.tool()
@mcp.tool(title="Get TSFM Models")
def get_tsfm_models() -> TSFMModelsResult:
"""Returns the list of available pre-trained TinyTimeMixer (TTM) model checkpoints.

Expand All @@ -136,7 +144,7 @@ def get_tsfm_models() -> TSFMModelsResult:
# ── TSFM Forecasting (zero-shot inference) ────────────────────────────────────


@mcp.tool()
@mcp.tool(title="Run TSFM Forecasting")
def run_tsfm_forecasting(
dataset_path: str,
timestamp_column: str,
Expand Down Expand Up @@ -191,8 +199,7 @@ def run_tsfm_forecasting(

try:
data_df = _read_ts_data(dataset_path, dataset_config_dictionary=dataset_config)
with open(model_checkpoint + "/config.json") as _f:
model_config = json.load(_f)
model_config = _load_model_config(model_checkpoint)

output_data_quality = _tsfm_data_quality_filter(
data_df, dataset_config, model_config, task="inference"
Expand Down Expand Up @@ -264,7 +271,7 @@ def run_tsfm_forecasting(
# ── TSFM Finetuning ───────────────────────────────────────────────────────────


@mcp.tool()
@mcp.tool(title="Run TSFM Finetuning")
def run_tsfm_finetuning(
dataset_path: str,
timestamp_column: str,
Expand Down Expand Up @@ -326,8 +333,7 @@ def run_tsfm_finetuning(

try:
data_df = _read_ts_data(dataset_path, dataset_config_dictionary=dataset_config)
with open(model_checkpoint + "/config.json") as _f:
model_config = json.load(_f)
model_config = _load_model_config(model_checkpoint)

os.makedirs(abs_save_dir, exist_ok=True)

Expand Down Expand Up @@ -400,7 +406,7 @@ def run_tsfm_finetuning(
# ── TSAD (conformal anomaly detection on top of TSFM forecasts) ──────────────


@mcp.tool()
@mcp.tool(title="Run Anomaly Detection")
def run_tsad(
dataset_path: str,
tsfm_output_json: str,
Expand Down Expand Up @@ -508,7 +514,7 @@ def run_tsad(
# ── Integrated TSAD (forecasting + anomaly detection in one call) ─────────────


@mcp.tool()
@mcp.tool(title="Run Integrated Forecasting + Anomaly Detection")
def run_integrated_tsad(
dataset_path: str,
timestamp_column: str,
Expand Down Expand Up @@ -558,10 +564,20 @@ def run_integrated_tsad(
ad_model_save = _get_outputs_path("tsad_model_save/")
os.makedirs(ad_model_save, exist_ok=True)

with open(model_checkpoint + "/config.json") as _f:
model_config = json.load(_f)
model_config = _load_model_config(model_checkpoint)
df_combined = pd.DataFrame()

# Read the full dataset once with all target columns, then subset per column
full_config = _build_dataset_config(
timestamp_column,
target_columns,
conditional_columns,
id_columns,
frequency_sampling,
autoregressive_modeling,
)
full_data_df = _read_ts_data(dataset_path, dataset_config_dictionary=full_config)

for col in target_columns:
col_config = _build_dataset_config(
timestamp_column,
Expand All @@ -572,8 +588,8 @@ def run_integrated_tsad(
autoregressive_modeling,
)

# 1. Load and quality-filter data for this column
data_df = _read_ts_data(dataset_path, dataset_config_dictionary=col_config)
# 1. Quality-filter data for this column (reuse already-loaded data)
data_df = full_data_df
output_dq = _tsfm_data_quality_filter(
data_df, col_config, model_config, task="inference"
)
Expand Down
8 changes: 4 additions & 4 deletions src/servers/utilities/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
logging.basicConfig(level=_log_level)
logger = logging.getLogger("utilities-mcp-server")

mcp = FastMCP("utilities")
mcp = FastMCP("utilities", instructions="General utilities: read JSON files and get current date/time.")


class DateTimeResult(BaseModel):
Expand Down Expand Up @@ -49,7 +49,7 @@ def get_temp_filename() -> str:
# --- JSON Tools ---


@mcp.tool()
@mcp.tool(title="Read JSON File")
def json_reader(file_name: str) -> str:
"""Reads a JSON file, parses its content, and returns the parsed data."""
try:
Expand All @@ -64,7 +64,7 @@ def json_reader(file_name: str) -> str:
# --- Time Tools ---


@mcp.tool()
@mcp.tool(title="Get Current Date and Time")
def current_date_time() -> DateTimeResult:
"""Provides the current date time as a JSON object."""
now = datetime.now(timezone.utc)
Expand All @@ -78,7 +78,7 @@ def current_date_time() -> DateTimeResult:
return DateTimeResult(currentDateTime=now_iso, currentDateTimeDescription=description)


@mcp.tool()
@mcp.tool(title="Get Current Time in English")
def current_time_english() -> TimeEnglishResult:
"""Returns the current time in English text."""
now = datetime.now(timezone.utc)
Expand Down
Loading