# 环境和连接

In [None]:
import os
import openai
import glob
import shutil

import numpy as np
import pandas as pd
import pymysql

import json
import io
import inspect
import requests
import re
import random
import string
import base64

from bs4 import BeautifulSoup
import dateutil.parser as parser
import tiktoken
from lxml import etree

import sys
from dotenv import load_dotenv
from openai import OpenAI

import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display, Code, Markdown, Image
from IPython import get_ipython

In [None]:
load_dotenv()  # 环境加载

In [None]:
os.environ['HTTP_PROXY'] = 'http://127.0.0.1:7890'  # 根据代理工具设置
os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:7890'

In [None]:
# 读取模型API-KEY及请求地址
API_KEY = os.getenv("API_KEY")
MODEL = os.getenv("MODEL")
BASE_URL = os.getenv("BASE_URL")

In [None]:
# 测试和模型的连接
client = OpenAI(api_key=API_KEY, base_url=BASE_URL) #openai的调用规则
response = client.chat.completions.create(  #创建沟通用对话
    model=MODEL,                                    #选择模型
    messages=[                          #传入信息
        {"role": "user", "content": "你好，好久不见!"} #用户沟通语句
    ]
)
#执行玩上面就是回复了，这里把回复打印出来
print(response.choices[0].message.content)

# 函数执行 模块

## 语句执行 部分

In [None]:
# 定义执行python代码的功能
def python_inter(py_code,g=None):    #globals是形参，用于导入运行外部函数的参数
    """
    专门用于执行python代码，并获取最终查询或处理结果。
    :param py_code: 字符串形式的Python代码，
    :param g: g，字符串形式变量，表示环境变量，无需设置，保持默认参数即可
    :return：代码运行的最终结果
    """
    print("正在调用python_inter工具运行Python代码...")
    g = g or globals() # 如果g是None，就用当前模块的globals()
    try:
        # 尝试如果是表达式，则返回表达式运行结果
        # eval()函数：用于动态执行字符串形式的表达式,会返回执行结果的表达式
        return str(eval(py_code, g))
    # 若报错，则先测试是否是对相同变量重复赋值
    except Exception as e :
        global_vars_before = set(g.keys())
        try:
            exec(py_code, g)    #会执行，不会返回表达式
        except Exception as e:
            return f"代码执行时报错{e}"
        global_vars_after = set(g.keys())
        new_vars = global_vars_after - global_vars_before
        # 若存在新变量
        if new_vars:
            result = {var: g[var] for var in new_vars}
            print("代码已顺利执行，正在进行结果梳理...")
            return str(result)
        else:
            print("代码已顺利执行，正在进行结果梳理...")
            return "已经顺利执行代码"

# python_inter 的参数内容。后续写在tool里面说明这个函数的参数形式。（用f"{python_inter_args}"传送给大模型）
python_inter_args = '{"py_code": "import numpy as np\\narr = np.array([1, 2, 3, 4])\\nsum_arr = np.sum(arr)\\nsum_arr"}'

In [None]:
# tool工具说明，用jason格式写的，给大模型看
python_inter_tool = {       #设计字典，表明
    "type": "function",     #表明是函数形式
    "function": {           #描述函数具体信息
        "name": "python_inter",     #函数名
        "description": f"当用户需要编写Python程序并执行时，请调用该函数。该函数可以执行一段Python代码并返回最终结果，需要注意，本函数只能执行非绘图类的代码，若是绘图相关代码，则需要调用fig_inter函数运行。\n同时需要注意，编写外部函数的参数消息时，必须是满足json格式的字符串，例如如以下形式字符串就是合规字符串：{python_inter_args}",      #描述功能，利用f"{}"传递标准形式
        "parameters": {             #参数
            "type": "object",
            "properties": {
                "py_code": {
                    "type": "string",
                    "description": "The Python code to execute."
                },
                "g": {
                    "type": "string",
                    "description": "Global environment variables, default to globals().",
                    "default": "globals()"
                }
            },
            "required": ["py_code"]
        }
    }
}



In [None]:
# # 测试代码准备
# py_code = """
# import numpy as np
# arr = np.array([1, 2, 3, 4])
# sum_arr = np.sum(arr)
# sum_arr
# """
#
# python_inter(py_code, g=globals())  #执行测试

## 绘图 部分

In [None]:
# 专门运行绘图的代码 执行能力
def fig_inter(py_code, fname, g='globals()'):
    print("正在调用fig_inter工具运行Python代码...")
    import matplotlib
    import os
    import matplotlib.pyplot as plt
    import seaborn as sns
    import pandas as pd
    from IPython.display import display, Image

    # 切换为无交互式后端
    current_backend = matplotlib.get_backend()
    matplotlib.use('Agg')

    # 用于执行代码的本地变量
    local_vars = {"plt": plt, "pd": pd, "sns": sns}

    # 相对路径保存目录
    pics_dir = 'pics'
    if not os.path.exists(pics_dir):
        os.makedirs(pics_dir)

    try:
        # 执行用户代码
        exec(py_code, g, local_vars)
        g.update(local_vars)

        # 获取图像对象
        fig = local_vars.get(fname, None)
        if fig:
            rel_path = os.path.join(pics_dir, f"{fname}.png")
            fig.savefig(rel_path, bbox_inches='tight')
            display(Image(filename=rel_path))
            print("代码已顺利执行，正在进行结果梳理...")
            return f"✅ 图片已保存，相对路径: {rel_path}"
        else:
            return "⚠️ 代码执行成功，但未找到图像对象，请确保有 `fig = ...`。"
    except Exception as e:
        return f"❌ 执行失败：{e}"
    finally:
        # 恢复原有绘图后端
        matplotlib.use(current_backend)

In [None]:
# 绘图函数给大模型的说明信息
fig_inter_tool = {
    "type": "function",
    "function": {
        "name": "fig_inter",
        "description": (    #文字描述部分
            "当用户需要使用 Python 进行可视化绘图任务时，请调用该函数。"
            "该函数会执行用户提供的 Python 绘图代码，并自动将生成的图像对象保存为图片文件并展示。\n\n"
            "调用该函数时，请传入以下参数：\n\n"
            "1. `py_code`: 一个字符串形式的 Python 绘图代码，**必须是完整、可独立运行的脚本**，"
            "代码必须创建并返回一个命名为 `fname` 的 matplotlib 图像对象；\n"
            "2. `fname`: 图像对象的变量名（字符串形式），例如 'fig'；\n"
            "3. `g`: 全局变量环境，默认保持为 'globals()' 即可。\n\n"
            "📌 请确保绘图代码满足以下要求：\n"        #明确要求
            "- 包含所有必要的 import（如 `import matplotlib.pyplot as plt`, `import seaborn as sns` 等）；\n"
            "- 必须包含数据定义（如 `df = pd.DataFrame(...)`），不要依赖外部变量；\n"
            "- 推荐使用 `fig, ax = plt.subplots()` 显式创建图像；\n"
            "- 使用 `ax` 对象进行绘图操作（例如：`sns.lineplot(..., ax=ax)`）；\n"
            "- 最后明确将图像对象保存为 `fname` 变量（如 `fig = plt.gcf()`）。\n\n"
            "📌 不需要自己保存图像，函数会自动保存并展示。\n\n"
            "✅ 合规示例代码：\n"       #给了规范代码
            "```python\n"
            "import matplotlib.pyplot as plt\n"
            "import seaborn as sns\n"
            "import pandas as pd\n\n"
            "df = pd.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]})\n"
            "fig, ax = plt.subplots()\n"
            "sns.lineplot(data=df, x='x', y='y', ax=ax)\n"
            "ax.set_title('Line Plot')\n"
            "fig = plt.gcf()  # 一定要赋值给 fname 指定的变量名\n"
            "```"
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "py_code": {
                    "type": "string",
                    "description": (
                        "需要执行的 Python 绘图代码（字符串形式）。"
                        "代码必须创建一个 matplotlib 图像对象，并赋值为 `fname` 所指定的变量名。"
                    )
                },
                "fname": {
                    "type": "string",
                    "description": "图像对象的变量名（例如 'fig'），代码中必须使用这个变量名保存绘图对象。"
                },
                "g": {
                    "type": "string",
                    "description": "运行环境变量，默认保持为 'globals()' 即可。",
                    "default": "globals()"
                }
            },
            "required": ["py_code", "fname"]    #必须参数
        }
    }
}


In [None]:
## 测试用例
# py_code = '''
# import matplotlib.pyplot as plt
# import seaborn as sns
# import pandas as pd
#
# # 生成一些示例数据
# data = {
#     'x': [1, 2, 3, 4, 5],
#     'y': [10, 20, 25, 30, 40]
# }
# df = pd.DataFrame(data)
#
# # 创建一个散点图
# plt.figure(figsize=(8, 6))
# sns.scatterplot(x='x', y='y', data=df)
# plt.title('Scatter Plot Example')
# plt.xlabel('X Axis')
# plt.ylabel('Y Axis')
#
# # 保存图形到一个变量
# fig = plt.gcf()
# '''
#
# # 测试fig_inter功能的调用
# fig_inter(py_code, 'fig', g=globals())

# 本地数据库 模块

## 执行sql的语句

In [None]:
# 执行sql代码的函数
def sql_inter(sql_query, g='globals()'):    #规定输入是sql_query语句
    """
    用于执行一段SQL代码，并最终获取SQL代码执行结果，\
    核心功能是将输入的SQL代码传输至MySQL环境中进行运行，\
    并最终返回SQL代码运行结果。需要注意的是，本函数是借助pymysql来连接MySQL数据库。
    :param sql_query: 字符串形式的SQL查询语句，用于执行对MySQL中telco_db数据库中各张表进行查询，并获得各表中的各类相关信息
    :param g: g，字符串形式变量，表示环境变量，无需设置，保持默认参数即可
    :return：sql_query在MySQL中的运行结果。
    """
    print("正在调用sql_inter工具运行SQL代码...")
    load_dotenv(override=True)
    host = os.getenv('HOST')
    user = os.getenv('USER')
    mysql_pw = os.getenv('MYSQL_PW')
    db = os.getenv('DB_NAME')
    port = os.getenv('PORT')

    connection = pymysql.connect(
        host = host,
        user = user,
        passwd = mysql_pw,
        db = db,
        port = int(port),
        charset='utf8',
    )

    try:
        with connection.cursor() as cursor:
            sql = sql_query # 输入
            cursor.execute(sql) # 执行
            results = cursor.fetchall() #返回
            print("SQL代码已顺利运行，正在整理答案...")

    finally:
        connection.close()

    return json.dumps(results)
# 标准参数格式
sql_inter_args = '{"sql_query": "SHOW TABLES;"}'

## 查询的语句

In [None]:
# 给大模型的工具说明
sql_inter_tool = {
    "type": "function",
    "function": {
        "name": "sql_inter",
        "description": (
            "当用户需要进行数据库查询工作时，请调用该函数。"
            "该函数用于在指定MySQL服务器上运行一段SQL代码，完成数据查询相关工作，"
            "并且当前函数是使用pymsql连接MySQL数据库。"
            "本函数只负责运行SQL代码并进行数据查询，若要进行数据提取，则使用另一个extract_data函数。"
            "同时需要注意，编写外部函数的参数消息时，必须是满足json格式的字符串，例如以下形式字符串就是合规字符串："
            f"{sql_inter_args}"
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "sql_query": {
                    "type": "string",
                    "description": "The SQL query to execute in MySQL database."
                },
                "g": {
                    "type": "string",
                    "description": "Global environment variables, default to globals().",
                    "default": "globals()"
                }
            },
            "required": ["sql_query"]
        }
    }
}


In [None]:
# 测试用例
# sql_query = "SHOW DATABASES;"
# sql_inter(sql_query, g=globals())

## 提取保存数据函数

In [None]:
# sql提取保存数据函数
def extract_data(sql_query, df_name, g='globals()'):
    """
    借助pymysql将MySQL中的某张表读取并保存到本地Python环境中。
    :param sql_query: 字符串形式的SQL查询语句，用于提取MySQL中的某张表。
    :param df_name: 将MySQL数据库中提取的表格进行本地保存时的变量名，以字符串形式表示。
    :param g: g，字符串形式变量，表示环境变量，无需设置，保持默认参数即可
    :return：表格读取和保存结果
    """
    print("正在调用extract_data工具运行SQL代码...")
    load_dotenv(override=True)

    host = os.getenv('HOST')
    user = os.getenv('USER')
    mysql_pw = os.getenv('MYSQL_PW')
    db = os.getenv('DB_NAME')
    port = os.getenv('PORT')

    connection = pymysql.connect(
        host = host,
        user = user,
        passwd = mysql_pw,
        db = db,
        port = int(port),
        charset='utf8',
    )

    g[df_name] = pd.read_sql(sql_query, connection)
    print("代码已顺利执行，正在进行结果梳理...")
    return "已成功创建pandas对象：%s，该变量保存了同名表格信息" % df_name
# 可运行参数实例
extract_data_args = '{"sql_query": "SELECT * FROM user_churn", "df_name": "user_churn"}'

In [None]:
# sql提取函数说明
extract_data_tool = {   #倒逼模型生成代码
    "type": "function",
    "function": {
        "name": "extract_data",
        "description": (
            "用于在MySQL数据库中提取一张表到当前Python环境中，注意，本函数只负责数据表的提取，"
            "并不负责数据查询，若需要在MySQL中进行数据查询，请使用sql_inter函数。"
            "同时需要注意，编写外部函数的参数消息时，必须是满足json格式的字符串，"
            f"例如如以下形式字符串就是合规字符串：{extract_data_args}"
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "sql_query": {
                    "type": "string",
                    "description": "The SQL query to extract a table from MySQL database."
                },
                "df_name": {
                    "type": "string",
                    "description": "The name of the variable to store the extracted table in the local environment."
                },
                "g": {
                    "type": "string",
                    "description": "Global environment variables, default to globals().",
                    "default": "globals()"
                }
            },
            "required": ["sql_query", "df_name"]
        }
    }
}

In [None]:
# # 测试用例
# extract_data(sql_query="SELECT * FROM user_churn",
#              df_name="user_churn",
#              g=globals())
# user_churn.head()

# 联网搜索 模块

## google搜索

In [None]:
# 谷歌搜索服务器信息导入
google_search_key = os.getenv("GOOGLE_SEARCH_API_KEY")
cse_id = os.getenv("CSE_ID")
search_cookie = os.getenv("search_cookie")
search_user_agent = os.getenv("search_user_agent")

In [None]:
# google搜索函数
def google_search(query, num_results=10, site_url=None):
    '''

    :param query:查询问题
    :param num_results:网页数量
    :param site_url:指定站点
    :return:返回编写好的json文件
    '''
    api_key = os.getenv("GOOGLE_SEARCH_API_KEY")    #搜索api
    cse_id = os.getenv("CSE_ID")    #设备id

    url = "https://www.googleapis.com/customsearch/v1" #定向搜索网站

    # API 请求参数
    if site_url == None:        #决定全网搜索还是指定一个网站
        params = {
        'q': query,
        'key': api_key,
        'cx': cse_id,
        'num': num_results
        }
    else:
        params = {
        'q': query,
        'key': api_key,
        'cx': cse_id,
        'num': num_results,
        'siteSearch': site_url      #指定的网站
        }

    # 发送请求
    response = requests.get(url, params=params) #发送http请求
    #params：把前面构造好的参数字典（如 q=xxx, key=xxx）自动拼接到 URL 上。
    #示例：https://www.googleapis.com/customsearch/v1?q=python&key=xxxx&cx=xxxx&num=10
    #返回搜索到的结果，是json格式的

    #检查响应状态
    response.raise_for_status()

    # 解析响应，转化成python字典
    search_results = response.json().get('items', [])

    # 提取所需信息，去除非必要字段，让返回结果更轻量、易读、便于 Agent 使用
    results = [{
        'title': item['title'],
        'link': item['link'],
        'snippet': item['snippet']
    } for item in search_results]
    #
    return results

In [None]:
# google搜索 测试用例
results = google_search(query="什么是MCP", num_results=5, )
results

In [None]:
# 准备本地保存文件名
# 定义一个辅助函数：s转化成，合规的window文件/文件夹，名称
def windows_compatible_name(s, max_length=255):
    """
    将字符串转化为符合Windows文件/文件夹命名规范的名称。

    参数:
    - s (str): 输入的字符串。
    - max_length (int): 输出字符串的最大长度，默认为255。

    返回:
    - str: 一个可以安全用作Windows文件/文件夹名称的字符串。
    """

    # Windows文件/文件夹名称中不允许的字符列表
    forbidden_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']

    # 使用下划线替换不允许的字符
    for char in forbidden_chars:
        s = s.replace(char, '_')

    # 删除尾部的空格或点
    s = s.rstrip(' .')

    # 检查是否存在以下不允许被用于文档名称的关键词，如果有的话则替换为下划线
    reserved_names = ["CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
                      "LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"]
    if s.upper() in reserved_names:
        s += '_'

    # 如果字符串过长，进行截断
    if len(s) > max_length:
        s = s[:max_length]

    return s

## 知乎搜索（反爬虫用不了）

In [None]:
def get_search_text(q, url):
    """
    专用于抓取知乎页面内容并保存为 JSON
    若失败返回 None
    """
    cookie = os.getenv('search_cookie')
    user_agent = os.getenv('search_user_agent')

    if not cookie or not user_agent:
        print("缺少环境变量 search_cookie 或 search_user_agent，跳过知乎页面")
        return None

    title = None
    text_d = []
    code_ = []

    headers = {
        'authority': 'www.zhihu.com',
        'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'cache-control': 'max-age=0',
        'cookie': cookie,
        'upgrade-insecure-requests': '1',
        'user-agent': user_agent,
    }

    try:
        res = requests.get(url, headers=headers, timeout=10)
        if res.status_code != 200:
            print(f"❌ 知乎页面访问失败 ({res.status_code}): {url}")
            return None
        res_xpath = etree.HTML(res.text)
    except Exception as e:
        print(f"❌ 请求失败 {url}: {e}")
        return None

    try:
        # 普通问答页（问题页）
        if 'zhihu.com/question' in url and 'answer' not in url:
            titles = res_xpath.xpath('//div[@class="QuestionHeader"]//h1/text()')
            title = titles[0] if titles else None
            text_d = res_xpath.xpath(
                '//div[contains(@class, "List-item")]//'
                'div[contains(@class, "Paragraph")]/text() | '
                '//div[contains(@class, "RichText")]//p/text()'
            )

        # 专栏文章
        elif 'zhuanlan.zhihu.com' in url:
            headers['authority'] = 'zhuanlan.zhihu.com'
            titles = res_xpath.xpath('//h1[@class="Post-Title"]/text() | //header//h1/text()')
            title = titles[0] if titles else None
            text_d = res_xpath.xpath('//div[@class="Post-RichText"]//p/text()')
            code_ = res_xpath.xpath('//div[@class="Post-RichText"]//pre/code//text()')

        # 特定回答链接
        elif 'zhihu.com/question' in url and 'answer' in url:
            titles = res_xpath.xpath('//div[@class="QuestionHeader"]//h1/text()')
            title = titles[0] if titles else None
            text_d = res_xpath.xpath(
                '//div[contains(@class, "AnswerRichText")]//p/text() | '
                '//div[contains(@class, "Paragraph")]/text()'
            )

        else:
            print(f"不支持的知乎 URL 类型: {url}")
            return None

    except Exception as e:
        print(f"解析失败 {url}: {e}")
        return None

    # ✅ 检查是否成功提取标题
    if not title:
        print(f"未提取到标题，跳过: {url}")
        return None

    # 清洗标题（避免非法字符）
    title = windows_compatible_name(title.strip())

    # 拼接正文
    text = ' '.join([str(t).replace('\n', ' ') for t in text_d])
    if code_:
        code_text = ' '.join([str(c).replace('\n', ' ') for c in code_])
        text += "\n\n代码示例:\n" + code_text

    # 计算 token
    try:
        encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
        tokens = len(encoding.encode(text))
    except Exception:
        tokens = len(text) // 4  # 粗略估算

    # 保存为 JSON
    dir_path = f'./auto_search/{q}'
    os.makedirs(dir_path, exist_ok=True)
    json_data = [{
        "link": url,
        "title": title,
        "content": text,
        "tokens": tokens
    }]
    file_path = f'{dir_path}/{title}.json'
    try:
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(json_data, f, ensure_ascii=False, indent=2)
        print(f"已保存知乎内容: {title}")
    except Exception as e:
        print(f"保存文件失败: {e}")
        return None

    return title

In [None]:
url = 'https://www.zhihu.com/question/7762420288'
q = "什么是MCP"
get_search_text(q, url)

In [None]:
def get_search_result(q):
    """
    当你无法回答某个问题时，调用该函数，能够获得答案
    :param q: 必选参数，询问的问题，字符串类型对象
    :return：某问题的答案，以字符串形式呈现
    """
    # 默认搜索返回5个答案
    results = google_search(query=q, num_results=5, site_url='https://zhihu.com/')

    # 创建对应问题的子文件夹
    folder_path = './auto_search/%s' % q
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # 单独提取links放在一个list中
    num_tokens = 0
    content = ''
    for item in results:
        url = item['link']
        title = get_search_text(q, url)
        with open('./auto_search/%s/%s.json' % (q, title), 'r') as f:
            jd = json.load(f)
        num_tokens += jd[0]['tokens']
        if num_tokens <= 12000:
            # print(jd[0]['content'])
            content += jd[0]['content']
        else:
            break
    return(content)

In [None]:
# 知乎搜索 测试案例
get_search_result(q)

#知乎反爬，放弃模块了

In [None]:
# # 测试用例
# url = "https://www.zhihu.com/robots.txt"
# q = "什么是json"
# get_website_text(url, q)

 ## github搜索部分

In [None]:
# 访问令牌导入
github_token = os.getenv('GITHUB_TOKEN')

In [None]:
#从Google搜索结果中自动识别并提取GitHub项目主页的链接，

# 并解析出项目的owner和repo（仓库名）信息。
def extract_github_repos(search_results):
    # search_results来自google_search()的结果

    # 使用列表推导式筛选出项目主页链接
    repo_links = [result['link']
                  for result in search_results
                  if '/issues/' not in result['link']       #排除 issues 页面
                  and '/blob/' not in result['link']        #排除具体文件页面
                  and 'github.com' in result['link']        #只保留GitHub的链接
                  and len(result['link'].split('/')) == 5   #确保是项目主页（格式：https://github.com/owner/repo)
                  ]

    # 从筛选后的链接中提取owner和repo
    repos_info = [{'owner': link.split('/')[3],
                   'repo': link.split('/')[4]}
                  for link in repo_links
                  ]
    '''
    link.split('/') →
    ['https:', '', 'github.com', 'deepseek-ai', 'deepseek-mcp']
      [0]     [1]       [2]          [3]              [4]
    '''

    return repos_info

In [None]:
# 获取指定仓库README.md

def get_github_readme(dic): #传入字典，字典里要有主人和repo信息

    github_token = os.getenv('GITHUB_TOKEN')    #设置令牌
    user_agent = os.getenv('search_user_agent') #设置代理

    owner = dic['owner']        #指定谁
    repo = dic['repo']          #指定repo

    #设置访问请求头
    headers = {
        "Authorization": github_token,
        "User-Agent": user_agent
    }

    #发送 API 请求获取 README
    response = requests.get(f"https://api.github.com/repos/{owner}/{repo}/readme", headers=headers)
    #GitHub 返回的是一个 JSON

    #解码内容
    readme_data = response.json()   #转化成python字典
    encoded_content = readme_data.get('content', '')    #取出 Base64 编码的字符串
    decoded_content = base64.b64decode(encoded_content).decode('utf-8') #先 Base64 解码成字节；再用 UTF-8 解码成人类可读的文本（Markdown）

    return decoded_content  #返回人类可读的文本

In [None]:
# 将 GitHub 项目 README 内容保存为本地结构化文件的工具函数
def get_search_text_github(q, dic): #传入问题、字典
    #构造文件标题
    title = dic['owner'] + '_' + dic['repo']

    #转换为 Windows 兼容的文件名
    title = windows_compatible_name(title)

    # 创建问题答案正文
    text = get_github_readme(dic)   #上面定义的函数，传入信息字典，返回readme文件信息，完整的项目说明文档

    # 写入本地json文件
    encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") #tiktoken，模拟大模型如何分词。计算text的token占用

    #构建结构化数据
    json_data = [
        {
            "title": title,
            "content": text,
            "tokens": len(encoding.encode(text))
        }
    ]

    # 自动创建目录，如果不存在的话
    dir_path = f'./auto_search/{q}'
    os.makedirs(dir_path, exist_ok=True)

    #保存readme文件
    with open('./auto_search/%s/%s.json' % (q, title), 'w') as f:
        json.dump(json_data, f)

    #返回生成的文件名
    return title

In [None]:
# 告诉模型：去上网搜索生成答案
def get_answer_github(q):
    """
    当你无法回答某个问题时，调用该函数，能够获得答案
    :param q: 必选参数，询问的问题，字符串类型对象
    :return：某问题的答案，以字符串形式呈现
    """
    # 调用转化函数，将用户的问题转化为更适合在GitHub上搜索的关键词
    # q = convert_keyword_github(q)

    # 默认搜索返回5个答案
    # print('正在接入谷歌搜索，查找和问题相关的答案...')

    # 调用 Google 搜索 GitHub 站点
    search_results = google_search(query=q, num_results=5, site_url='https://github.com/')
    # 返回处理好的json文件
    # 提取有效的 GitHub 项目信息
    results = extract_github_repos(search_results)

    # 创建对应问题的子文件夹
    folder_path = './auto_search/%s' % q
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # 遍历项目 、 获取内容 、 累计 token 数 、 控制输入长度
    num_tokens = 0
    content = ''

    for dic in results:
        title = get_search_text_github(q, dic)      # 对每个项目，自动下载readme并保存为json
        with open('./auto_search/%s/%s.json' % (q, title), 'r') as f:
            jd = json.load(f)               # 读取刚刚保存的json
        num_tokens += jd[0]['tokens']       # 累加token的数量
        if num_tokens <= 12000:             # 根据token上限
            content += jd[0]['content']     # 把内容拼接进入content
        else:
            break

    return content  #这里是读取到的readme的内容，作为素材提交给大模型准备生成自然语言

In [None]:
# get_answer_github 测试用例
q = 'DeepSeek-R1'
get_answer_github(q)

## 联网搜索外部函数

In [None]:
from bs4 import BeautifulSoup

def save_webpage_text(q, url):
    """
    通用网页抓取函数，适用于 GitHub、文档、博客等
    """
    headers = {
        'User-Agent': os.getenv('search_user_agent') or
                      'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }

    try:
        res = requests.get(url, headers=headers, timeout=10)
        res.raise_for_status()
        soup = BeautifulSoup(res.text, 'html.parser')

        for tag in soup(['script', 'style', 'nav', 'header', 'footer']):
            tag.decompose()

        title_tag = soup.find('title')
        title = (title_tag.get_text().strip() if title_tag else url)[:150]
        title = windows_compatible_name(title)

        # 根据网站类型提取正文
        main_elem = (
            soup.find('main') or
            soup.find('article') or
            soup.find('[role="main"]') or
            soup.find('.content') or
            soup.find('.post') or
            soup.body
        )

        text = main_elem.get_text(separator=' ', strip=True)
        text = ' '.join(text.split())  # 压缩空白

        enc = tiktoken.encoding_for_model("gpt-3.5-turbo")
        tokens = len(enc.encode(text))

        dir_path = f'./auto_search/{q}'
        os.makedirs(dir_path, exist_ok=True)
        data = [{
            "link": url,
            "title": title,
            "content": text,
            "tokens": tokens
        }]
        with open(f'{dir_path}/{title}.json', 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)

        return title

    except Exception as e:
        print(f"抓取失败 {url}: {e}")
        return None

In [None]:
import webbrowser
import time
# 直接从google获取的函数
def get_answer(q, g='globals()'):
    print('正在接入谷歌搜索，查找和问题相关的答案...')
    results = google_search(query=q, num_results=10)

    # 🔥 关键：过滤掉知乎链接！
    filtered_results = []
    for item in results:
        url = item['link']
        if 'zhihu.com' in url:
            print(f"跳过知乎链接: {url}")
            continue
        filtered_results.append(item)

    if not filtered_results:
        print("没有找到非知乎的结果")
        return ""

    # 创建目录
    folder_path = f'./auto_search/{q}'
    os.makedirs(folder_path, exist_ok=True)

    content = ''
    num_tokens = 0

    for item in filtered_results:
        url = item['link']
        print(f'正在处理: {url}')

        # 可选：浏览器预览
        if os.getenv("search_with_browser") == '1':
            edge_path = "C:/Program Files (x86)/Microsoft/Edge/Application/msedge.exe"
            webbrowser.register('edge', None, webbrowser.BackgroundBrowser(edge_path))
            webbrowser.get('edge').open(url)
            time.sleep(2)

        # 使用通用抓取函数（见下方）
        title = save_webpage_text(q, url)

        if not title:
            continue

        file_path = f'./auto_search/{q}/{title}.json'
        if not os.path.exists(file_path):
            continue

        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                jd = json.load(f)
            text = jd[0]['content']
            tokens = jd[0]['tokens']

            if num_tokens + tokens <= 12000:
                content += text + "\n\n"
                num_tokens += tokens
            else:
                print("内容已达 token 上限（12000），停止加载。")
                break
        except Exception as e:
            print(f"读取文件失败: {e}")
            continue

    return content

In [None]:
# 给大模型的说明
get_answer_tool = {
    "type": "function",
    "function": {
        "name": "get_answer",
        "description": (
            "联网搜索工具，当用户提出的问题超出你的知识库范畴时，或该问题你不知道答案的时候，请调用该函数来获得问题的答案。该函数会自动从知乎上搜索得到问题相关文本，而后你可围绕文本内容进行总结，并回答用户提问。需要注意的是，当用户点名要求想要了解GitHub上的项目时候，请调用get_answer_github函数。"
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "q": {
                    "type": "string",
                    "description": "一个满足知乎搜索格式的问题，用字符串形式进行表示。",
                    "example": "什么是MCP?"
                },
                "g": {
                    "type": "string",
                    "description": "Global environment variables, default to globals().",
                    "default": "globals()"
                }
            },
            "required": ["q"]
        }
    }
}

In [None]:
# 测试用例
get_answer(q="什么是MCP？", g=globals())

## Github搜索外部函数

In [None]:
def get_answer_github(q, g='globals()'):
    """
    当你无法回答某个问题时，调用该函数，能够获得答案
    :param q: 必选参数，询问的问题，字符串类型对象
    :param g: g，字符串形式变量，表示环境变量，无需设置，保持默认参数即可
    :return：某问题的答案，以字符串形式呈现
    """
    # 调用转化函数，将用户的问题转化为更适合在GitHub上搜索的关键词
    # q = convert_keyword_github(q)

    # 默认搜索返回5个答案
    print('正在接入谷歌搜索，查找和问题相关的答案...')
    search_results = google_search(query=q, num_results=5, site_url='https://github.com/')
    results = extract_github_repos(search_results)

    # 创建对应问题的子文件夹
    folder_path = './auto_search/%s' % q
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    print('正在读取相关项目说明文档...')
    num_tokens = 0
    content = ''

    for dic in results:
        title = get_search_text_github(q, dic)
        with open('./auto_search/%s/%s.json' % (q, title), 'r') as f:
            jd = json.load(f)
        num_tokens += jd[0]['tokens']
        if num_tokens <= 12000:
            content += jd[0]['content']
        else:
            break
    print('正在进行最后的整理...')
    return(content)

In [None]:
get_answer_github_tool = {
    "type": "function",
    "function": {
        "name": "get_answer_github",
        "description": (
            "GitHub联网搜索工具，当用户提出的问题超出你的知识库范畴时，或该问题你不知道答案的时候，请调用该函数来获得问题的答案。该函数会自动从GitHub上搜索得到问题相关文本，而后你可围绕文本内容进行总结，并回答用户提问。需要注意的是，当用户提问点名要求在GitHub进行搜索时，例如“请帮我介绍下GitHub上的Qwen2项目”，此时请调用该函数，其他情况下请调用get_answer外部函数并进行回答。"
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "q": {
                    "type": "string",
                    "description": "一个满足GitHub搜索格式的问题，往往是需要从用户问题中提出一个适合搜索的项目关键词，用字符串形式进行表示。",
                    "example": "DeepSeek-R1"
                },
                "g": {
                    "type": "string",
                    "description": "Global environment variables, default to globals().",
                    "default": "globals()"
                }
            },
            "required": ["q"]
        }
    }
}

In [None]:
get_answer(q="什么是MCP？", g=globals())

# 搭建miniManus工作流

In [None]:
# 辅助函数，有代码就直接打印出来
def print_code_if_exists(function_args):
    """
    如果存在代码片段，则打印代码
    """
    def convert_to_markdown(code, language):
        return f"```{language}\n{code}\n```"

    # 如果是SQL，则按照Markdown中SQL格式打印代码
    if function_args.get('sql_query'):
        code = function_args['sql_query']
        markdown_code = convert_to_markdown(code, 'sql')
        print("即将执行以下代码：")
        display(Markdown(markdown_code))

    # 如果是Python，则按照Markdown中Python格式打印代码
    elif function_args.get('py_code'):
        code = function_args['py_code']
        markdown_code = convert_to_markdown(code, 'python')
        print("即将执行以下代码：")
        display(Markdown(markdown_code))

In [None]:
def create_function_response_messages(messages, response):

    """
    调用外部工具，并更新消息列表
    :param messages: 原始消息列表
    :param response: 模型某次包含外部工具调用请求的响应结果
    :return：messages，追加了外部工具运行结果后的消息列表
    """
    #总共6个外部函数
    available_functions = {
        "python_inter": python_inter,
        "fig_inter": fig_inter,
        "sql_inter": sql_inter,
        "extract_data": extract_data,
        "get_answer": get_answer,
        "get_answer_github": get_answer_github,
    }

    # 提取function call messages
    function_call_messages = response.choices[0].message.tool_calls

    # 将function call messages追加到消息列表中
    messages.append(response.choices[0].message.model_dump())

    # 提取本次外部函数调用的每个任务请求
    for function_call_message in function_call_messages:

        # 提取外部函数名称
        tool_name = function_call_message.function.name
        # 提取外部函数参数
        tool_args = json.loads(function_call_message.function.arguments)

        # 查找外部函数
        fuction_to_call = available_functions[tool_name]

        # 打印代码
        print_code_if_exists(function_args=tool_args)

        # 运行外部函数
        try:
            tool_args['g'] = globals()
            # 运行外部函数
            function_response = fuction_to_call(**tool_args)
        except Exception as e:
            function_response = "函数运行报错如下:" + str(e)

        # 拼接消息队列
        messages.append(
            {
                "role": "tool",
                "content": function_response,
                "tool_call_id": function_call_message.id,
            }
        )
    #输出拼接好的消息列表
    return messages

In [None]:
# 创建工具列表
tools = [python_inter_tool,fig_inter_tool,sql_inter_tool,extract_data_tool,get_answer_tool,get_answer_github_tool]

In [None]:
# 单次对话函数，完成对用户的相应
def chat_base(messages, client, model):
    """
    获得一次模型对用户的响应。若其中需要调用外部函数，
    则会反复多次调用create_function_response_messages函数获得外部函数响应。
    """

    client = client
    model = model

    #试着运行
    try:
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            tools=tools,
        )

    except Exception as e:
        print("模型调用报错" + str(e))
        return None
    # 判断需不需要tool_calls
    if response.choices[0].finish_reason == "tool_calls":
        while True:
            messages = create_function_response_messages(messages, response)
            #再次带入相应模型
            response = client.chat.completions.create(
                model=model,
                messages=messages,
                tools=tools,
            )
            #while true控制是不是继续
            if response.choices[0].finish_reason != "tool_calls":
                break

    return response

## 功能测试

### python代码

In [None]:
messages=[{"role": "user", "content": "请帮我查询当前数据库中，总共有几张表。"}]
response = chat_base(messages=messages, client=client, model=MODEL)

In [None]:
display(Markdown(response.choices[0].message.content))

In [None]:
messages=[{"role": "user", "content": "请帮我模拟一组数据，并绘制核密度分布图"}]
response = chat_base(messages=messages, client=client, model=MODEL)

In [None]:
display(Markdown(response.choices[0].message.content))

### 联网测试

In [None]:
messages=[{"role": "user", "content": "请帮我介绍下最新大模型MCP技术。"}]
response = chat_base(messages=messages, client=client, model=MODEL)

In [None]:
display(Markdown(response.choices[0].message.content))

In [None]:
messages=[{"role": "user", "content": "请帮我介绍下GitHub上的DeepSeek-R1这个项目"}]
response = chat_base(messages=messages, client=client, model=MODEL)

In [None]:
display(Markdown(response.choices[0].message.content))

# 创建主类

In [None]:
# 让MiniManus本地创建MD
def save_markdown_to_file(content: str, filename_hint: str, directory="research_task"):
    # 在当前项目目录下创建 research_task 文件夹
    save_dir = os.path.join(os.getcwd(), directory)

    # 如果目录不存在则创建
    os.makedirs(save_dir, exist_ok=True)

    # 创建文件名（取前8个字符并加上...）
    filename = f"{filename_hint[:8]}....md"

    # 完整文件路径
    file_path = os.path.join(save_dir, filename)

    # 将内容保存为Markdown文档
    with open(file_path, 'w', encoding='utf-8') as file:
        file.write(content)

    print(f"文件已成功保存到：{file_path}")

In [None]:
save_markdown_to_file(content="测试文档", filename_hint="测试文档创建", directory="research_task")

In [None]:
# 定义项目主类。类的使用更灵活，有更多信息。
# 实例化后，可以把对话保存在对象里面
class miniManusClass:
    def __init__(self,
                 api_key=None,
                 model=None,
                 base_url=None,
                 messages=None):
        #加载.env环境信息
        load_dotenv(override=True)

        if api_key != None:
            self.api_key = api_key
        else:
            self.api_key = os.getenv("API_KEY")

        if model != None:
            self.model = model
        else:
            self.model = os.getenv("MODEL")

        if base_url != None:
            self.base_url = base_url
        else:
            self.base_url = os.getenv("BASE_URL")

        # message可以自定义初始化别的语句
        if messages != None:
            self.messages = messages
        else:
            self.messages = [{"role":"system", "content":"你miniManus，是一名助人为乐的助手。"}]

        #建立一个client实例
        self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)
        #检测
        try:
            print("正在测试模型能否正常调用...")
            self.models = self.client.models.list()

            if self.models:
                print("▌ MiniManus初始化完成，欢迎使用！")
            else:
                print("模型无法调用，请检查网络环境或本地模型配置。")

        except Exception as e:
            print("初始化失败，可能是网络或配置错误。详细信息：", str(e))

    # 初始化的
    def chat(self):
        print("你好，我是MiniManus，有什么需要帮助的？")
        while True: #只要有.chat方法，就会开对话框input
            question = input("请输入您的问题(输入退出以结束对话): ")
            if question == "退出":
                break

            self.messages.append({"role": "user", "content": question})
            #只会看后面20条
            self.messages = self.messages[-20: ]
            # 通过chat_base完成对用户的相应，通过while true一直调用外部工具到结束
            response = chat_base(messages=self.messages,
                                 client=self.client,
                                 model=self.model)
            #展示回复
            display(Markdown("**MiniManus**:" + response.choices[0].message.content))
            # 把消息添加到消息列表
            self.messages.append(response.choices[0].message)

    # 准备搜索任务，围绕任务进行相应
    def research_task(self, question):
        # 第一个promt，引导用户更多信息
        prompt_style1 = """
        你是一名专业且细致的助手，你的任务是在用户提出问题后，通过友好且有引导性的追问，更深入地理解用户真正的需求背景。这样，你才能提供更精准和更有效的帮助。
        当用户提出一个宽泛或者不够明确的问题时，你应当积极主动地提出后续问题，引导用户提供更多背景和细节，以帮助你更准确地回应。
        示例引导问题：

        用户提问示例：
        最近，在大模型技术领域，有一项非常热门的技术，名叫MCP，model context protocol，调用并深度总结，这项技术与OpenAI提出的function calling之间的区别。

        你应该给出的引导式回应示例：
        在比较MCP（Model Context Protocol）与OpenAI的Function Calling时，我可以涵盖以下几个方面：
        - 定义和基本概念：MCP和Function Calling的基本原理和目标。
        - 工作机制：它们如何处理模型的输入和输出。
        - 应用场景：它们分别适用于哪些具体场景？
        - 技术优势与局限性：各自的优劣势分析。
        - 生态和兼容性：它们是否能与现有的大模型和应用集成。
        - 未来发展趋势：这些技术未来的发展方向。
        请问你是否希望我特别关注某些方面，或者有特定的技术细节需要深入分析？

        再比如用户提问：
        请你帮我详细整理，华为910B2x鲲鹏920，如何部署DeepSeek模型。

        你应该给出的引导式回应示例：
        请提供以下详细信息，以便我能为您整理完整的部署指南：
        1. 您希望部署的DeepSeek模型具体是哪一个？（例如DeepSeek-VL、DeepSeek-Coder等）
        2. 目标系统环境（操作系统、已有软件环境等）？
        3. 是否有特定的深度学习框架要求？（如PyTorch、TensorFlow）
        4. 是否需要优化部署（如使用昇腾NPU加速）？
        5. 期望的使用场景？（如推理、训练、微调等）
        请提供这些信息后，我将为您整理具体的部署步骤。

        记住，保持友好而专业的态度，主动帮助用户明确需求，而不是直接给出不够精准的回答。现在用户提出问题如下：{}，请按照要求进行回复。
        """
        # prompt2：执行流程，利用工具
        prompt_style2 = """
        你是一位知识广博、擅长利用多种外部工具的资深研究员。当用户已明确提出具体需求：{}，现在你的任务是：
        首先明确用户问题的核心及相关细节。
        尽可能调用可用的外部工具（例如：联网搜索工具get_answer、GitHub搜索工具get_answer_github、本地代码运行工具python_inter以及其他工具），围绕用户给出的原始问题和补充细节，进行广泛而深入的信息收集。
        综合利用你从各种工具中获取的信息，提供详细、全面、专业且具有深度的解答。你的回答应尽量达到2000字以上，内容严谨准确且富有洞察力。

        示例流程：
        用户明确需求示例：
        我目前正在学习 ModelContextProtocol（MCP），主要关注它在AI模型开发领域中的具体应用场景、技术细节和一些业界最新的进展。
        你的回应流程示例：
        首先重述并确认用户的具体需求。
        明确你将调用哪些外部工具，例如：
        使用联网搜索工具查询官方或权威文档对 MCP 在AI模型开发领域的具体应用说明；
        调用GitHub搜索工具，寻找业界针对MCP技术项目；
        整理并分析通过工具获取的信息，形成一篇逻辑清晰、结构合理的深度报告。

        再比如用户需要编写数据分析报告示例：
        我想针对某电信公司过去一年的用户数据，编写一份详细的用户流失预测数据分析报告，报告需要包括用户流失趋势分析、流失用户特征分析、影响用户流失的关键因素分析，并给出未来减少用户流失的策略建议。
        你的回应流程示例：
        明确并确认用户需求，指出分析内容包括用户流失趋势、流失用户特征、关键影响因素以及策略建议。
        明确你将调用哪些外部工具，例如：
        使用数据分析工具对提供的用户数据进行流失趋势分析，生成趋势图表；
        使用代码执行环境（如调用python_inter工具）对流失用户进行特征分析，确定典型特征；
        通过统计分析工具识别影响用户流失的关键因素（如服务质量、价格敏感度、竞争对手促销），同时借助绘图工具（fig_inter）进行重要信息可视化展示；
        使用互联网检索工具检索行业内最新的客户保留策略与实践，提出有效的策略建议。

        记住，回答务必详细完整，字数至少在2000字以上，清晰展示你是如何运用各种外部工具进行深入研究并形成专业结论的。

        """
        #第一个回复，引导提问获得信息
        response = self.client.chat.completions.create(model=self.model,
                                                  messages=[{"role": "user", "content": prompt_style1.format(question)}])
        #第一次响应
        display(Markdown("**MiniManus:**" + response.choices[0].message.content))
        # 拼接new messages
        new_messages = [
            {"role": "user", "content": question},
            # 拼接模型相应后的内容
            response.choices[0].message.model_dump()
        ]
        # 输入补充信息
        new_question = input("请输入您的补充说明(输入退出以结束对话): ")
        if new_question == "退出":
            return None
        else:
            # 整理new messages代入第二个prompt准备回复
            new_messages.append({"role": "user", "content":prompt_style2.format(new_question)})
            # 生成第二次回复
            second_response = chat_base(messages=new_messages,
                                        client=self.client,
                                        model=self.model)

            display(Markdown("**MiniManus**:" + second_response.choices[0].message.content))

            save_markdown_to_file(content=second_response.choices[0].message.content,
                                  filename_hint=question)
        # 清空messages
        def clear_messages(self):
            self.messages = []

## 功能实操

In [None]:
miniManus = miniManusClass()

In [None]:
miniManus.chat()

## 任务测试

In [302]:
miniManus_tools_test = miniManusClass()

INFO:httpx:HTTP Request: GET https://api.deepseek.com/v1/models "HTTP/1.1 200 OK"


正在测试模型能否正常调用...
▌ MiniManus初始化完成，欢迎使用！


In [None]:
miniManus_tools_test.chat()

### 任务模式测试

In [304]:
miniManus_task_test = miniManusClass()

INFO:httpx:HTTP Request: GET https://api.deepseek.com/v1/models "HTTP/1.1 200 OK"


正在测试模型能否正常调用...
▌ MiniManus初始化完成，欢迎使用！


In [305]:
question = "请帮我详细介绍下大模型最新MCP技术"

In [306]:
miniManus_task_test.research_task(question)

INFO:httpx:HTTP Request: POST https://api.deepseek.com/v1/chat/completions "HTTP/1.1 200 OK"


**MiniManus:**为了更精准地为您介绍大模型最新的MCP（Model Context Protocol）技术，我需要进一步了解您的具体需求背景。以下是几个关键方向的追问，您可以根据兴趣选择或补充细节：

1. **技术定位**  
   - 您关注的是MCP在模型交互协议中的创新（如动态上下文管理），还是与其他技术（如Function Calling/RAG）的对比？  
   - 是否需要结合具体应用场景（如智能体协作、长文本处理）分析？

2. **深度侧重点**  
   - 希望了解协议设计原理（如上下文压缩、多模态适配），还是实际部署案例（如与Llama3/GPT-4的集成）？  
   - 是否需要代码级实现示例（如API调用规范）？

3. **行业视角**  
   - 是否关注特定厂商的生态支持（如华为/OpenAI的兼容性）？  
   - 或想了解其对AI基础设施（如推理加速、成本优化）的影响？

4. **延伸需求**  
   - 是否需要同步提供技术演进时间线（如与早期Prompt Engineering的对比）？  
   - 或关注安全/伦理相关设计（如上下文权限控制）？

示例回复方向：  
"根据您的需求，我可以从以下维度展开：  
① MCP的核心机制（上下文动态分块/优先级调度）  
② 与传统API协议的吞吐量实测对比  
③ 在金融文档分析中的落地案例  
请告知您更感兴趣的模块，我将优先细化该部分内容。"  

期待您的补充说明，这将帮助我提供高度定制化的技术解读！

KeyboardInterrupt: Interrupted by user

### 复杂数据分析测试

In [None]:
miniManus1 = miniManusClass()

In [None]:
miniManus11 = miniManusClass()

In [None]:
miniManus11.chat()

In [None]:
miniManus_tools_test10 = miniManusClass()

In [None]:
miniManus_tools_test10.chat()

## 机器学习建模测试

In [None]:
miniManus1.chat()