In [None]:
import os 
import sys
import asyncio
import requests

from langchain.agents import initialize_agent, Tool
from langchain_core.tools import tool
from langchain.schema import HumanMessage
from langgraph.prebuilt import create_react_agent
from langchain_community.chat_models.tongyi import ChatTongyi
from langgraph.checkpoint.memory import MemorySaver


llm = ChatTongyi(api_key=os.getenv("DASHSCOPE_API_KEY"), model_name="qwen-max")

@tool
def read_doc_file(pageContext):
    """读取docs目录下指定文件的内容。"""
    base_dir = "../docs"
    file_path = os.path.join(base_dir, pageContext)
    if not os.path.isfile(file_path):
        return f"File {pageContext} not found in docs directory."
    with open(file_path, "r", encoding="utf-8") as f:
        return f.read()


@tool
def write_file(file_name, content):
    """根据要求，将输出的内容进行修改后，写入文件"""
    file_path = os.path.join("../docs", file_name)
    with open(file_path, 'w') as f:
        f.write(content)
    return f"文件{file_name}已写入成功"



memory = MemorySaver()
prompt = """
你是一个专业的助手，你的任务是根据用户的问题。
"""

tools = [read_doc_file, write_file]
agent = create_react_agent(llm, tools, checkpointer=memory, prompt=prompt)

config = {"configurable": {"thread_id": "unique_thread_id"}}

response = agent.invoke(
    {"messages": [{"role": "user", "content": "讲讲大模型的原理"}]},
    config=config
)
print(response["messages"][-1].content)


## 修改可调用的Ai-agent

In [None]:
import os 
import sys
import asyncio
import requests
import time
import json
from typing import List, Dict, Any, Generator

from langchain.agents import initialize_agent, Tool
from langchain_core.tools import tool
from langchain.schema import HumanMessage
from langgraph.prebuilt import create_react_agent
from langchain_community.chat_models.tongyi import ChatTongyi
from langgraph.checkpoint.memory import MemorySaver

# 初始化模型
llm = ChatTongyi(api_key=os.getenv("DASHSCOPE_API_KEY"), model_name="qwen-max")

@tool
def read_doc_file(pageContext):
    """读取docs目录下指定文件的内容。"""
    base_dir = "../docs"
    file_path = os.path.join(base_dir, pageContext)
    if not os.path.isfile(file_path):
        return f"File {pageContext} not found in docs directory."
    with open(file_path, "r", encoding="utf-8") as f:
        return f.read()

@tool
def write_file(file_name, content):
    """根据要求，将输出的内容进行修改后，写入文件"""
    file_path = os.path.join("../docs", file_name)
    with open(file_path, 'w') as f:
        f.write(content)
    return f"文件{file_name}已写入成功"

@tool
def analyze_page_context(page_info):
    """分析当前页面上下文信息"""
    try:
        if isinstance(page_info, str):
            page_data = json.loads(page_info)
        else:
            page_data = page_info
            
        page_type = page_data.get('type', 'unknown')
        description = page_data.get('description', '未知页面')
        
        analysis = {
            'homepage': f"🏠 用户当前在首页：{description}。这里是网站的入口，包含主要功能介绍。",
            'docs': f"📖 用户当前在文档页面：{description}。这是技术文档区域，用户可能需要文档相关帮助。",
            'blog': f"📝 用户当前在博客页面：{description}。这里展示文章和技术分享。",
            'file-manager': f"📁 用户当前在文件管理页面：{description}。用户可能需要文件操作帮助。",
            'other': f"🔍 用户当前在：{description}。"
        }
        
        return analysis.get(page_type, analysis['other'])
    except Exception as e:
        return f"页面信息解析失败：{str(e)}"

# 初始化内存和工具
memory = MemorySaver()

# 更新提示词，包含页面上下文感知
prompt = """
你是一个专业的网站智能助手，具备以下能力：

1. 📚 读取和分析文档内容
2. ✍️ 创建和修改文件
3. 🔍 分析用户当前页面上下文
4. 💡 提供专业的开发建议

请根据用户的问题和当前页面上下文，智能选择合适的工具来帮助用户。
如果用户询问文档相关问题，可以使用 read_doc_file 工具。
如果需要创建或修改文件，可以使用 write_file 工具。
如果需要了解用户当前页面情况，可以使用 analyze_page_context 工具。

始终保持友好、专业的态度，提供准确有用的信息。
"""

tools = [read_doc_file, write_file, analyze_page_context]
agent = create_react_agent(llm, tools, checkpointer=memory, prompt=prompt)

# 全局配置
config = {"configurable": {"thread_id": "unique_thread_id"}}

class WebsiteAgent:
    """网站智能助手包装类"""
    
    def __init__(self):
        self.agent = agent
        self.config = config
    
    def chat_stream(self, messages: List[Dict], page_context: Dict = None) -> Generator[str, None, None]:
        """流式聊天接口"""
        try:
            # 准备用户输入
            user_input = messages[-1]["content"] if messages else ""
            
            # 构建完整的消息，包含页面上下文
            if page_context:
                context_info = f"\n\n当前页面信息：{json.dumps(page_context, ensure_ascii=False)}"
                user_input_with_context = user_input + context_info
            else:
                user_input_with_context = user_input
            
            # 记录请求信息
            print(f"🤖 Agent 处理请求：{user_input[:50]}...")
            if page_context:
                print(f"📄 页面上下文：{page_context.get('description', 'Unknown')}")
            
            # 调用 Agent
            response = self.agent.invoke(
                {"messages": [{"role": "user", "content": user_input_with_context}]},
                self.config
            )
            
            # 获取最终输出
            final_output = response["messages"][-1].content
            
            # 模拟流式输出
            chunk_size = 8  # 每次返回的字符数
            for i in range(0, len(final_output), chunk_size):
                chunk = final_output[i:i + chunk_size]
                yield chunk
                time.sleep(0.03)  # 模拟流式延迟
                
        except Exception as e:
            error_msg = f"❌ Agent 执行错误：{str(e)}"
            print(error_msg)
            yield error_msg
    
    def chat_single(self, user_input: str, page_context: Dict = None) -> str:
        """单次对话接口"""
        try:
            if page_context:
                context_info = f"\n\n当前页面信息：{json.dumps(page_context, ensure_ascii=False)}"
                user_input_with_context = user_input + context_info
            else:
                user_input_with_context = user_input
                
            response = self.agent.invoke(
                {"messages": [{"role": "user", "content": user_input_with_context}]},
                self.config
            )
            
            return response["messages"][-1].content
        except Exception as e:
            return f"❌ Agent 执行错误：{str(e)}"

# 全局 Agent 实例
_agent_instance = None

def get_agent_instance() -> WebsiteAgent:
    """获取 Agent 单例实例"""
    global _agent_instance
    if _agent_instance is None:
        _agent_instance = WebsiteAgent()
    return _agent_instance

def chat_with_agent(messages: List[Dict], page_context: Dict = None) -> Generator[str, None, None]:
    """与 Agent 聊天的便捷接口"""
    agent_instance = get_agent_instance()
    return agent_instance.chat_stream(messages, page_context)

# 如果直接运行此文件，执行测试
if __name__ == "__main__":
    response = agent.invoke(
        {"messages": [{"role": "user", "content": "讲讲大模型的原理"}]},
        config=config
    )
    print(response["messages"][-1].content)

In [None]:
def calculate_total_area():
    # 初始化
    dp = [[0] * 6 for _ in range(6)]
    area_sum = [[0] * 6 for _ in range(6)]
    
    # 边界条件
    for i in range(6):
        dp[i][0] = 1  # 沿x轴的路径
        dp[0][i] = 1  # 沿y轴的路径
        area_sum[i][0] = 0  # 沿x轴无面积
        area_sum[0][i] = 0  # 沿y轴无面积
    
    # 动态规划
    for i in range(1, 6):
        for j in range(1, 6):
            dp[i][j] = dp[i-1][j] + dp[i][j-1]
            area_sum[i][j] = (area_sum[i-1][j] + 
                             area_sum[i][j-1] + 
                             i * dp[i][j-1])
    
    return dp[5][5], area_sum[5][5]

# 计算结果
paths, total_area = calculate_total_area()
print(f"路径总数: {paths}")
print(f"总面积: {total_area}")


In [None]:
from tavily import TavilyClient

tavily_client = TavilyClient(api_key="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")
response = tavily_client.extract("https://www.xiaohongshu.com/discovery/item/687d049a000000001202379d?app_platform=ios&app_version=8.93&share_from_user_hidden=true&xsec_source=app_share&type=normal&xsec_token=CBx7yL0b9jJTC-GE4NQwLGTJjOBPzbVGu3wPJ_IZzel8k=&author_share=1&xhsshare=CopyLink&shareRedId=ODY6NDY8PE02NzUyOTgwNjY0OTc5PEc7&apptime=1753665070&share_id=c72a11e1002040c994ef1fa913828199")

print(response)
