In [None]:
from datetime import datetime

# 模拟医院的科室和医生数据
hospital_data = {
    "内科": [
        {"姓名": "张医生", "职称": "主任医师", "专长": "呼吸系统疾病", "出诊时间": "周一、周三"},
        {"姓名": "李医生", "职称": "副主任医师", "专长": "消化系统疾病", "出诊时间": "周二、周五"}
    ],
    "外科": [
        {"姓名": "王医生", "职称": "主任医师", "专长": "神经外科", "出诊时间": "周一、周四"},
        {"姓名": "赵医生", "职称": "副主任医师", "专长": "骨科手术", "出诊时间": "周二、周五"}
    ],
    "妇产科": [
        {"姓名": "陈医生", "职称": "主任医师", "专长": "产科", "出诊时间": "周一、周五"},
        {"姓名": "刘医生", "职称": "主治医师", "专长": "妇科炎症", "出诊时间": "周三、周六"}
    ]
}

# 查询特定科室的所有医生信息
def query_doctors_by_department(department):
    if department in hospital_data:
        return hospital_data[department]
    else:
        return f"没有找到{department}科室的信息。"

# 模拟预约函数
def book_appointment(patient_name, department, doctor_name, appointment_date):
    # 查找科室和医生
    if department not in hospital_data:
        return f"科室 {department} 不存在。"
    
    doctors = hospital_data[department]
    selected_doctor = None
    
    for doctor in doctors:
        if doctor["姓名"] == doctor_name:
            selected_doctor = doctor
            break
    
    if not selected_doctor:
        return f"医生 {doctor_name} 不在 {department} 科室。"
    
    # 模拟预约成功，返回预约信息
    appointment_info = {
        "患者姓名": patient_name,
        "预约科室": department,
        "预约医生": doctor_name,
        "预约日期": appointment_date.strftime("%Y年%m月%d日"),
        "医生职称": selected_doctor["职称"],
        "医生专长": selected_doctor["专长"],
        "出诊时间": selected_doctor["出诊时间"]
    }
    
    return appointment_info

# 示例调用
department_name = "内科"
doctor_name = "张医生"
patient_name = "李四"
appointment_date = datetime(2024, 9, 10)

appointment_info = book_appointment(patient_name, department_name, doctor_name, appointment_date)
print(appointment_info)


In [None]:

def get_current_date():
    # 获取当前日期，并将其格式化为字符串 'YYYY-MM-DD'
    current_date = datetime.now().strftime("%Y-%m-%d")
    current_date_result = f"现在日期是：{current_date}"
    return current_date_result
get_current_date()

In [8]:
from maa_tools import tools_map, query_user_for_details,gen_tools_desc
from maa_prompt import gen_prompt, user_prompt
tools_dict= gen_tools_desc()
print(tools_dict)
print(user_prompt)

[
    {
        "name": "query_user_for_details",
        "description": "向用户提问，用于获取更多信息，以深入了解用户的需求。",
        "parameters": {
            "type": "object",
            "properties": {
                "query_user": {
                    "type": "string",
                    "description": "用户输入的信息，用于明确用户的需求。"
                }
            }
        },
        "required": [
            "query_user"
        ]
    },
    {
        "name": "finish",
        "description": "结束对话并给出最终预约的详细信息。",
        "parameters": {
            "type": "object",
            "properties": {
                "answer": {
                    "type": "dict",
                    "description": "包含最预约的字典，输出为完整的答案。"
                }
            }
        },
        "required": [
            "answer"
        ]
    },
    {
        "name": "get_current_date",
        "description": "获取当前日期并以字符串格式返回。",
        "parameters": {},
        "required": []
    },
    {
        "name": "query_doctors_by_department",
       

In [1]:
from maa_tools import tools_map, query_user_for_details
from maa_prompt import gen_prompt, user_prompt
from maa_model_provider import ModelProvider
from dotenv import load_dotenv
import dashscope
import os

load_dotenv()
mp = ModelProvider()

dashscope.api_key = "sk-f529539e3a50472fac783cf1e99fddef"
os.environ["TAVILY_API_KEY"] = "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"

def parse_thoughts(response, cur_request_time,max_request_time,debug):
    """解析模型返回的响应，提取关键信息并格式化为字符串，并根据debug参数决定是否打印"""
    try:
        thoughts = response.get("thoughts")
        planning = thoughts.get("planning")
        reasoning = thoughts.get("reasoning")
        reflection = thoughts.get("reflection")
        summary = thoughts.get("summary")
        observation = response.get("observation")
        prompt = f"planning: {planning}\nreasoning: {reasoning}\nreflection: {reflection}\nobservation: {observation}\nsummary: {summary}"
        prompt += f"\n这是医疗知识库第{cur_request_time}次响应，最多执行{max_request_time}次\n" 
        # if debug:
        #     print(prompt)
        return prompt
    except Exception as e:
        print(f"parse_thoughts error: {e}")
        return ""

def execute_action(action_name, action_args, debug):
    """根据动作名称执行相应的工具函数，并根据debug参数决定是否打印动作信息"""
    try:
        func = tools_map.get(action_name)
        result = func(**action_args)
        if debug:
            print(f"action_name: {action_name}, action_args: {action_args}")
        return result
    except Exception as e:
        print(f"调用工具异常： {e}")
        return str(e)

def agent_execute(query, max_request_time, debug=False):
    """执行代理任务，与模型交互并处理结果，根据debug参数决定是否打印详细信息"""
    cur_request_time = 0
    chat_history = []
    agent_scratch = ""

    while cur_request_time < max_request_time:
        cur_request_time += 1
        prompt = gen_prompt(query, agent_scratch)
        response = mp.chat(prompt, chat_history)

        if not response or not isinstance(response, dict):
            print(f"call llm exception, response is: {response}")
            continue

        action_info = response.get("action")
        action_name = action_info.get("name")
        action_args = action_info.get("parameters")
        thoughts = response.get("thoughts")

        if debug:
            print(f'-------------第{cur_request_time}次推断------------')
            print(f"observation: {response.get('observation')}")
            print(f"planning: {thoughts.get('planning')}")
            print(f"reasoning: {thoughts.get('reasoning')}")
            print(f"reflection: {thoughts.get('reflection')}")
            print(f"summary: {thoughts.get('summary')}")
            print(f"action_name: {action_name}, action_args: {action_args}")

        if action_name == "query_user_for_details":
            user_response = input(query_user_for_details(action_args["query_user"]))
            chat_history.append([action_args["query_user"], user_response])
            agent_scratch += f"query_user: {action_args['query_user']}user response: {user_response}"
            continue

        call_function_result = execute_action(action_name, action_args, debug)
        agent_scratch += f"observation: {response.get('observation')} execute action {action_name} result: {call_function_result}"
        assistant_msg = parse_thoughts(response, cur_request_time,max_request_time,debug)
        chat_history.append([user_prompt, assistant_msg])

        if action_name == "finish":
            final_answer = action_args.get("answer")
            # final_answer = "".join([str(key) + str(value) for key, value in final_answer.items()])
            break
    if cur_request_time == max_request_time:
        final_answer="本次任务执行失败！,未能提供问诊建议"
    return final_answer

# 示例执行，启用调试模式


In [2]:
max_request_time = 20
query = '你好,我叫张三，我想要预约明天内科的李医生'
final_answer = agent_execute(query, max_request_time=max_request_time,debug=True)
print(f"final_answer: {final_answer}")

-------------第1次推断------------
observation: 任务初始阶段，准备获取当前日期。
planning: 首先获取当前日期，以便计算明天的日期，这是用户要求预约的时间。
reasoning: 在继续之前，需要确切知道当前日期，这样才能确定明天的日期，满足用户的预约需求。
reflection: 无
summary: 正在获取当前日期。
action_name: get_current_date, action_args: {}
action_name: get_current_date, action_args: {}
-------------第2次推断------------
observation: 已获取当前日期，正查询内科医生的出诊信息。
planning: 首先查询内科的医生信息，特别是用户提到的李医生的出诊情况。
reasoning: 需要确认李医生明天是否有出诊，以便安排预约。
reflection: 无
summary: 正在查询内科医生信息。
action_name: query_doctors_by_department, action_args: {'department': '内科'}
action_name: query_doctors_by_department, action_args: {'department': '内科'}
-------------第3次推断------------
observation: 已准备好为用户执行预约操作。
planning: 根据用户需求，已找到明天出诊的内科李医生（李12医生），现在将为用户张三预约明天的内科时间。
reasoning: 已获取到足够的信息来完成预约，无需进一步询问用户。
reflection: 无额外信息需要确认，直接进行预约操作。
summary: 正在为张三预约明天内科李12医生的就诊时间。
action_name: book_appointment, action_args: {'patient_name': '张三', 'department': '内科', 'doctor_name': '李12医生', 'appointment_date': '2024-09-10'}
action_name: book_appointment,

In [2]:
from datetime import datetime

def get_current_date():
    # 获取当前日期，并将其格式化为字符串 'YYYY-MM-DD'
    current_date_str = datetime.now().strftime("%Y-%m-%d %A")
    
    # 映射英文星期到中文
    week_day_map = {
        'Monday': '星期一',
        'Tuesday': '星期二',
        'Wednesday': '星期三',
        'Thursday': '星期四',
        'Friday': '星期五',
        'Saturday': '星期六',
        'Sunday': '星期日'
    }
    
    # 分割日期字符串以获取英文星期名称
    date_part, week_day_en = current_date_str.split()
    
    # 获取中文星期名称
    week_day_cn = week_day_map[week_day_en]
    
    # 生成最终的日期字符串，包括中文的星期
    current_date_result = f"现在日期是：{date_part} {week_day_cn}"
    return current_date_result

# 调用函数并打印结果
print(get_current_date())


现在日期是：2024-09-11 星期三
