Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
基于 Python `FastMCP` 和 `FastAPI` 的 自定义Elasticsearch MCP 服务Demo。

## 特性
- 支持关键词搜索、二次过滤、按 ID 查询
- 支持关键词搜索、二次过滤、按 ID 查询、复杂筛选词查询逻辑
- 基于 FastMCP 提供 MCP 协议工具:`search_news`、`search_news_with_secondary_filter`、`read_single_news`
- Prometheus 监控集成(`starlette_prometheus`)
- 基于 Redis 的服务端 Session 存储(`RedisSessionMiddleware`)
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"elasticsearch==8.15",
"fastmcp==2.8.1",
"fastmcp==2.10",
"uvicorn>=0.20.0",
"python-dotenv>=0.20.0",
"pydantic>=1.10.0",
Expand All @@ -18,6 +18,7 @@ dependencies = [
"itsdangerous>=2.2.0",
"gunicorn>=23.0.0",
"redis>=6.2.0",
"tenacity>=9.1.2",
]

[[tool.uv.index]]
Expand Down
96 changes: 96 additions & 0 deletions src/news_mcp_server/clients/elastic_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import asyncio
from dataclasses import dataclass
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from elastic_transport import TransportError
from typing import List
from elasticsearch import AsyncElasticsearch
from ..config.settings import es_settings
from ..exceptions import ToolException
Expand All @@ -8,12 +13,26 @@


class AsyncElasticClient:
@dataclass
class SearchResponse:
data: List[dict]
total: int = 0

def __init__(self):
# 初始化异步 ElasticSearch 客户端
self._client = AsyncElasticsearch(es_settings.URL,
api_key=es_settings.api_key, verify_certs=False)
self.index = es_settings.ES_INDEX

@retry(
reraise=True,
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=(
retry_if_exception_type(TransportError) |
retry_if_exception_type(asyncio.TimeoutError)
),
)
async def search_news(self, query: str, source: str = None, date_from: str = None, date_to: str = None, max_results: int = 10) -> list:
"""
ElasticSearch 异步搜索新闻
Expand Down Expand Up @@ -112,5 +131,82 @@ async def get_by_id(self, news_id: str) -> dict:
except Exception:
raise ToolException(f'Tool call exception with news_id {news_id}')

def _append_common_filters(self, must: list, search_word: str, date_from: str, date_to: str):
"""提炼公共过滤器: 添加 search_word 和时间范围到 must 列表"""
if search_word:
must.append({
'multi_match': {
'query': search_word,
'fields': ['title^5', 'content'],
'operator': 'and'
}
})
if date_from or date_to:
range_filter = {}
if date_from:
range_filter['gte'] = date_from
if date_to:
range_filter['lte'] = date_to
must.append({'range': {'release_time': range_filter}})

def _add_clauses(self, should_clauses: list, base_filters: list, secondary_queries: list[str], search_word: str, date_from: str, date_to: str):
"""根据 base_filters 和 secondary_queries 构建子句并添加到 should_clauses"""
if secondary_queries:
for sec in secondary_queries:
must = base_filters + [{'match_phrase': {'title': sec}}]
self._append_common_filters(must, search_word, date_from, date_to)
should_clauses.append({'bool': {'must': must}})
else:
must = base_filters.copy()
self._append_common_filters(must, search_word, date_from, date_to)
should_clauses.append({'bool': {'must': must}})

@retry(
reraise=True,
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=(
retry_if_exception_type(TransportError) |
retry_if_exception_type(asyncio.TimeoutError)
),
)
async def search_topic_news(
self,
primary_queries: List[str],
secondary_query: List[str]=None,
max_results: int = 10,
sources: List[str] = None,
search_word=None,
date_from: str = None,
date_to: str = None
) -> SearchResponse:
"""
"根据多个标签列表、筛选词列表(组)、数据源列表以 OR 关系批量查询新闻,支持时间范围筛选. "
"基本查询逻辑:<label1>&<filtered_words>|<label2>&<filtered_words>|<source1>&<filtered_words>|...|"
"允许在基本查询逻辑之上再搜索"
"""
limit = min(max_results, es_settings.MAX_RESULTS_LIMIT)
secondary_queries = secondary_query or []
should_clauses = []
for primary in primary_queries or []:
self._add_clauses(should_clauses, [{'match_phrase': {'title': primary}}], secondary_queries, search_word, date_from, date_to)
for source in sources or []:
self._add_clauses(should_clauses, [{'term': {'source.keyword': source}}], secondary_queries, search_word, date_from, date_to)
body = {'query': {'bool': {'should': should_clauses}}}
# 按发布日期降序排序
body['sort'] = [{'release_time': {'order': 'desc'}}]

response = await self._client.search(
index=self.index,
body=body,
size=limit,
source_includes=OUTPUT_SOURCE_FIELDS
)
raw_hits = response.get('hits', {})
hits = raw_hits.get('hits', [])
total = raw_hits.get("total", {}).get("value", 0)
return self.SearchResponse(data=[hit.get('_source', {}) for hit in hits], total=total)


async def close(self):
await self._client.close()
55 changes: 55 additions & 0 deletions src/news_mcp_server/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,59 @@ async def read_single_news( ctx: Context,
return news_item.model_dump()


@mcp.tool(
name="search_topic_news",
description="根据多个主关键词列表、筛选词列表(组)、数据源列表以 OR 关系批量查询新闻,支持时间范围筛选. "
"基本查询逻辑:<label1>&<filtered_words>|<label2>&<filtered_words>|<source1>&<filtered_words>|...|"
"允许在基本查询逻辑之上再搜索"
)
async def search_topic_news(
ctx: Context,
primary_queries: List[str]= Field(
description="【必填】根据多个主关键词列表,系统返回包含主关键词与次关键词组合,所有组合的结果以OR关系连接的新闻"
),
secondary_querys: List[str] = Field(default_factory=list,
description="筛选词,将与每个主关键词进行 AND 运算"

),
sources: List[str] = Field(default_factory=list,
description="数据源列表,将与每个主关键词进行 AND 运算"
),
search_word: str = Field(default="", description="搜索词"),
max_results: int = Field(
default=15,
description="【可选】希望返回的新闻数量,取值1-100,默认10"
),
date_from: str = Field(
default="",
description="【可选】起始发布日期,格式 YYYY-MM-DD"
),
date_to: str = Field(
default="",
description="【可选】结束发布日期,格式 YYYY-MM-DD"
)
) -> List[dict]:
"""MCP 工具:按多个主关键词与次关键词组合(A&D|B&D|...)批量搜索新闻"""
logger.info(f"Call search_topic_news", primary_queries=primary_queries,secondary_query=secondary_querys, ctx=ctx.request_context.request['state'])
if isinstance(primary_queries, str) and len(primary_queries.strip())>0:
primary_queries = [primary_queries]
if len(primary_queries) == 0:
return []
if isinstance(secondary_querys, str) and len(secondary_querys.strip())>0:
secondary_querys = [secondary_querys]
if isinstance(sources, str) and len(sources.strip())>0:
sources = [sources]
news_items = await app_services["news_service"].search_topic_news(
primary_queries=primary_queries,
secondary_query=secondary_querys,
max_results=max_results,
sources=sources,
search_word=search_word,
date_from=date_from,
date_to=date_to
)
logger.info(f"Call search_topic_news", total=news_items.get("total"), primary_queries_count=len(primary_queries),secondary_query_count=len(secondary_querys), ctx=ctx.request_context.request['state'])
return [item.model_dump() for item in news_items.get("data")]


mcp_app = create_http_app(mcp)
29 changes: 28 additions & 1 deletion src/news_mcp_server/services/news_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,31 @@ async def search_news_with_secondary_filter(self,
date_from=date_from,
date_to=date_to,
)
return [NewsBaseItem(**item) for item in items]
return [NewsBaseItem(**item) for item in items]

async def search_topic_news(
self,
primary_queries: List[str],
secondary_query: List[str],
max_results: int = 10,
sources: Optional[str] = None,
search_word=None,
date_from: Optional[str] = None,
date_to: Optional[str] = None
) -> dict:
"""
新功能:按多个主关键词(组)与次关键词组合(A&D|B&D|...)搜索新闻,并返回 NewsBaseItem 列表
"""
items = await self.client.search_topic_news(
primary_queries=primary_queries,
secondary_query=secondary_query,
sources=sources,
max_results=max_results,
search_word=search_word,
date_from=date_from,
date_to=date_to
)
return {
"total": items.total,
"data": [NewsBaseItem(**item) for item in items.data]
}
Loading