In [None]:
OPEN_ROUTER = "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"

SYSTEM_PROMPT = """你是一个报告生成助手。
你可以使用 MCP 服务器提供的工具来完成任务。
MCP 服务器会动态提供工具，你需要先检查当前可用的工具。

在使用 MCP 工具时，请遵循以下步骤：
1、根据任务需求选择合适的工具
2、按照工具的参数要求提供正确的参数
3、观察工具的返回结果，并根据结果决定下一步操作
4、工具可能会发生变化，比如新增工具或现有工具消失

请遵循以下指南：
- 使用工具时，确保参数符合工具的文档要求
- 如果出现错误，请理解错误原因并尝试用修正后的参数重新调用
- 按照任务需求逐步完成，优先选择最合适的工具
- 如果需要连续调用多个工具，请一次只调用一个工具并等待结果
- 以```json```格式输出。例如：```json{"name": "tool_name", "params": {"param1": "value1", "param2": "value2"}}```

请清楚地向用户解释你的推理过程和操作步骤。

可选择的工具如下：
"""

NEXT_STEP_PROMPT = """
## 任务目标
根据已经获取的信息，判断是否可以解决用户的需求。

## 任务要求
- 请认真审视用户的需求，特别注意用户需求中的条件和范围
- 如果可以解决(满足用户给出的条件和范围)，请输出<finish>
- 如果缺少数据或内容，请继续调用合适的工具获取更多信息

## 用户需求
用户需求如下:
{}
"""

FINISH_GENETATE = '''
## 任务目标
根据已收集信息和用户需求生成完整报告。

## 已收集信息
{}

## 任务要求
1、请根据图片的描述将图片链接插入到合适的位置，如果没有符合要求的图片，请不要插入图片
2、以markdown格式生成报告

## 用户需求
{}'''

In [None]:
from mcp.server.fastmcp import FastMCP
import requests
from openai import OpenAI
import logging

mcp = FastMCP("search")

base_url = "https://openrouter.ai/api/v1"
api_key = 'aaa'
model_name = 'deepseek/deepseek-chat:free'

# 创建日志记录器
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)

# 创建文件处理器
file_handler = logging.FileHandler('test.log')
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)

client = OpenAI(
    base_url=base_url,
    api_key=api_key,
)


def generate_query(query, stream=False):
    prompt = """You are an expert research assistant. Given the user's query, generate up to four distinct, precise search queries that would help gather comprehensive information on the topic.
    Return only a Python list of strings, for example: ['query1', 'query2', 'query3']."""

    response = client.chat.completions.create(
        model=model_name,
        messages=[
            {"role": "system", "content": "You are a helpful and precise research assistant."},
            {"role": "user", "content": f"User Query: {query}\n\n{prompt}"}
        ]
    )
    return response.choices[0].message.content


def if_useful(query: str, page_text: str):
    prompt = """You are a critical research evaluator. Given the user's query and the content of a webpage, determine if the webpage contains information relevant and useful for addressing the query.
    Respond with exactly one word: 'Yes' if the page is useful, or 'No' if it is not. Do not include any extra text."""

    response = client.chat.completions.create(
        model=model_name,
        messages=[
            {"role": "system", "content": "You are a strict and concise evaluator of research relevance."},
            {"role": "user",
             "content": f"User Query: {query}\n\nWebpage Content (first 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
        ]
    )

    response = response.choices[0].message.content

    if response:
        answer = response.strip()
        if answer in ["Yes", "No"]:
            return answer
        else:
            # Fallback: try to extract Yes/No from the response.
            if "Yes" in answer:
                return "Yes"
            elif "No" in answer:
                return "No"
    return "No"


def extract_relevant_context(query, search_query, page_text):
    prompt = """You are an expert information extractor. Given the user's query, the search query that led to this page, and the webpage content, extract all pieces of information that are relevant to answering the user's query.
    Return only the relevant context as plain text without commentary."""

    response = client.chat.completions.create(
        model=model_name,
        messages=[
            {"role": "system", "content": "You are an expert in extracting and summarizing relevant information."},
            {"role": "user",
             "content": f"User Query: {query}\nSearch Query: {search_query}\n\nWebpage Content (first 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
        ]
    )

    response = response.choices[0].message.content
    if response:
        return response.strip()
    return ""


def get_new_search_queries(user_query, previous_search_queries, all_contexts):
    context_combined = "\n".join(all_contexts)
    prompt = """You are an analytical research assistant. Based on the original query, the search queries performed so far, and the extracted contexts from webpages, determine if further research is needed.
    If further research is needed, provide up to four new search queries as a Python list (for example, ['new query1', 'new query2']). If you believe no further research is needed, respond with exactly .
    Output only a Python list or the token  without any additional text."""

    response = client.chat.completions.create(
        model=model_name,
        messages=[
            {"role": "system", "content": "You are an expert in extracting and summarizing relevant information."},
            {"role": "user",
             "content": f"User Query: {user_query}\nPrevious Search Queries: {previous_search_queries}\n\nExtracted Relevant Contexts:\n{context_combined}\n\n{prompt}"}
        ]
    )

    response = response.choices[0].message.content
    if response:
        cleaned = response.strip()
        if cleaned == "":
            return ""
        try:
            new_queries = eval(cleaned)
            if isinstance(new_queries, list):
                return new_queries
            else:
                logger.info(f"LLM did not return a list for new search queries. Response: {response}")
                return []
        except Exception as e:
            logger.error(f"Error parsing new search queries:{e}, Response:{response}")
            return []
    return []


def web_search(query: str, ) -> str:
    links = []
    response = requests.get(
        f'http://10.250.2.24:8088/search?format=json&q={query}&language=zh-CN&time_range=&safesearch=0&categories=general', timeout=10)
    results = response.json()['results']
    for result in results[:2]:
        links.append(result['url'])

    return links


def fetch_webpage_text(url):
    JINA_BASE_URL = 'https://r.jina.ai/'
    full_url = f"{JINA_BASE_URL}{url}"

    try:
        resp = requests.get(full_url, timeout=50)
        if resp.status_code == 200:
            return resp.text
        else:
            text = resp.text
            logger.info(f"Jina fetch error for {url}: {resp.status_code} - {text}")
            return ""
    except Exception as e:
        logger.error(f"Error fetching webpage text with Jina:{e}")
        return ""


def process_link(link, query, search_query):
    logger.info(f"Fetching content from: {link}")
    page_text = fetch_webpage_text(link)
    if not page_text:
        return None
    usefulness = if_useful(query, page_text)
    logger.info(f"Page usefulness for {link}: {usefulness}")
    if usefulness == "Yes":
        context = extract_relevant_context(query, search_query, page_text)
        if context:
            logger.info(f"Extracted context from {link} (first 200 chars): {context[:200]}")
            return context
    return None


def get_images_description(iamge_url):
    completion = client.chat.completions.create(

        model="qwen/qwen2.5-vl-32b-instruct:free",
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": "使用一句话描述图片的内容"
                    },
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": iamge_url
                        }
                    }
                ]
            }
        ]
    )
    return completion.choices[0].message.content


@mcp.tool()
def search(query: str) -> str:
    """互联网搜索"""
    iteration_limit = 3
    iteration = 0
    aggregated_contexts = []
    all_search_queries = []
    iteration = 0

    new_search_queries = eval(generate_query(query))
    all_search_queries.extend(new_search_queries)
    while iteration < iteration_limit:
        logger.info(f"\n=== Iteration {iteration + 1} ===")
        iteration_contexts = []
        search_results = [web_search(query) for query in new_search_queries]

        unique_links = {}
        for idx, links in enumerate(search_results):
            query = new_search_queries[idx]
            for link in links:
                if link not in unique_links:
                    unique_links[link] = query

        logger.info(f"Aggregated {len(unique_links)} unique links from this iteration.")

        # Process each link concurrently: fetch, judge, and extract context.
        link_results = [
            process_link(link, query, unique_links[link])
            for link in unique_links
        ]

        # Collect non-None contexts.
        for res in link_results:
            if res:
                iteration_contexts.append(res)

        if iteration_contexts:
            aggregated_contexts.extend(iteration_contexts)
        else:
            logger.info("No useful contexts were found in this iteration.")

        new_search_queries = get_new_search_queries(query, all_search_queries, aggregated_contexts)
        if new_search_queries == "":
            logger.info("LLM indicated that no further research is needed.")
            break
        elif new_search_queries:
            logger.info(f"LLM provided new search queries:{new_search_queries}")
            all_search_queries.extend(new_search_queries)
        else:
            logger.info("LLM did not provide any new search queries. Ending the loop.")
            break

        iteration += 1
    return '\n\n'.join(aggregated_contexts)


@mcp.tool()
def get_images(query: str) -> str:
    '''获取图片链接和描述'''
    logger.info(f"Searching for images for query: {query}")
    response = requests.get(
        f'http://10.250.2.24:8088/search?format=json&q={query}&language=zh-CN&time_range=month&safesearch=0&categories=images')
    results = response.json()['results']
    img_srcs = []
    for result in results[:2]:
        img_srcs.append(result['img_src'])

    result = {}

    for img_src in img_srcs:
        logger.info(f"Fetching image description for: {img_src}")
        description = get_images_description(img_src)
        logger.info(f"Image description for {img_src}: {description}")
        result[img_src] = description

    return result


if __name__ == "__main__":
    mcp.run()

In [None]:
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import Optional
from openai import AsyncOpenAI
from contextlib import AsyncExitStack
import json
import asyncio
import os

base_url = "https://openrouter.ai/api/v1"
api_key = 'aaa'
model_name = 'deepseek/deepseek-chat:free'


def get_clear_json(text):
    if '```json' not in text:
        return 0, text

    return 1, text.split('```json')[1].split('```')[0]


class MCPClient:
    def __init__(self):
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.client = AsyncOpenAI(
            base_url=base_url,
            api_key=api_key,
        )

    async def connect_to_server(self, server_script_path: str):
        server_params = StdioServerParameters(
            command="python",
            args=[server_script_path],
            env=None
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        await self.session.initialize()

        # 列出可用工具
        response = await self.session.list_tools()
        tools = response.tools
        logger.info(f"\nConnected to server with tools: {[tool.name for tool in tools]}")

    async def process_query(self, query: str) -> str:
        """使用 LLM 和 MCP 服务器提供的工具处理查询"""

        response = await self.session.list_tools()

        available_tools = [{
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                "parameters": tool.inputSchema
            }
        } for tool in response.tools]
        logger.info(f'available_tools:\n\n{available_tools}')

        messages = [
            {
                "role": "system",
                "content": SYSTEM_PROMPT + str(available_tools)
            },
            {
                "role": "user",
                "content": query
            }
        ]
        response = await self.client.chat.completions.create(
            model=model_name,
            messages=messages
        )

        message = response.choices[0].message
        logger.info(f'llm_output(tool call)：{message.content}')

        results = []
        while True:

            flag, json_text = get_clear_json(message.content)

            if flag == 0:
                response = await self.client.chat.completions.create(
                    model=model_name,
                    messages=[{"role": "user", "content": query}]
                )
                return response.choices[0].message.content

            json_text = json.loads(json_text)
            tool_name = json_text['name']
            tool_args = json_text['params']
            result = await self.session.call_tool(tool_name, tool_args)
            logger.info(f'tool name: \n{tool_name}\ntool call result: \n{result}')
            results.append(result.content[0].text)

            messages.append({
                "role": "assistant",
                "content": message.content
            })
            messages.append({
                "role": "user",
                "content": f'工具调用结果如下：{result}'
            })

            messages.append({
                "role": "user",
                "content": NEXT_STEP_PROMPT.format(query)
            })

            response = await self.client.chat.completions.create(
                model=model_name,
                messages=messages
            )

            message = response.choices[0].message
            logger.info(f'llm_output：\n{message.content}')

            if 'finish' in message.content:
                break

            messages.append({
                "role": "assistant",
                "content": message.content
            })

        messages.append({
            "role": "user",
            "content": FINISH_GENETATE.format('\n\n'.join(results), query)
        })

        response = await self.client.chat.completions.create(
            model=model_name,
            messages=messages
        )

        message = response.choices[0].message.content
        return message

    async def chat_loop(self):
        """运行交互式聊天循环"""
        logger.info("\nMCP Client Started!")
        logger.info("Type your queries or 'quit' to exit.")

        while True:
            try:
                query = input("\nQuery: ").strip()
                if query.lower() == 'quit':
                    break
                response = await self.process_query(query)
                print(response)
            except Exception as e:
                logger.error(f"\nError: {str(e)}")


async def main():
    client = MCPClient()
    await client.connect_to_server('./search_mcp.py')
    await client.chat_loop()

if __name__ == "__main__":
    asyncio.run(main())