# LlamaIndex：打造高效智能RAG应用的利器

在大语言模型（LLM）应用开发日新月异的今天，如何让LLM能够访问和理解私有数据已成为企业和开发者的核心需求。LlamaIndex作为一个专为LLM应用打造的数据框架，以其强大的功能和简洁的API，正在成为RAG（检索增强生成）系统开发的首选工具。今天，我们将通过实例代码深入了解LlamaIndex的核心功能，带您快速上手这一强大工具。

## 一、LlamaIndex 简介

LlamaIndex是一个数据框架，旨在帮助开发者构建基于LLM的应用程序。它的核心价值在于降低开发和维护成本，主要通过以下方式提供帮助：

1. **第三方能力抽象**：封装LLM、向量数据库、搜索接口等组件
2. **常用工具和方案封装**：提供RAG、代理等现成解决方案
3. **底层实现封装**：处理流式接口、超时重连、异步与并行等复杂性

一个好的开发框架应具备高可靠性、高可维护性、高可扩展性和低学习成本，而LlamaIndex在这些方面都表现出色。让我们通过代码示例来具体了解。

五行代码实现一个简单的RAG系统

In [1]:
# 导入所需的库
import os
from dotenv import load_dotenv
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader

# 加载环境变量
load_dotenv()

# 加载文档
documents = SimpleDirectoryReader(input_dir="./data/llamaindex_data").load_data()

# 创建索引
index = VectorStoreIndex.from_documents(documents)

# 创建查询引擎并提问
query_engine = index.as_query_engine()
response = query_engine.query("deepseek v3有多少参数")
print(response)

DeepSeek-V3有671B总参数，每个token激活了37B参数。


使用OpenAI模型

In [2]:
from llama_index.llms.openai import OpenAI
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# 初始化OpenAI模型
llm = OpenAI(model="gpt-4o-mini")

# 提问
response = llm.complete("谁是莎士比亚?")
print(response)

威廉·莎士比亚（William Shakespeare）是英国文艺复兴时期最著名的剧作家和诗人之一，通常被认为是英语文学史上最伟大的作家之一。他出生于1564年4月23日，去世于1616年4月23日。莎士比亚的作品包括37部戏剧、154首十四行诗以及其他诗歌。

他的戏剧分为悲剧、喜剧和历史剧三大类，著名作品包括《哈姆雷特》、《罗密欧与朱丽叶》、《麦克白》、《奥赛罗》和《威尼斯商人》等。莎士比亚的作品探讨了人性、爱情、权力、背叛等主题，深刻影响了后世的文学、戏剧和文化。他的语言和表达方式对英语的发展也产生了深远的影响。莎士比亚的作品至今仍在全球范围内广泛演出和研究。


使用DeepSeek模型

In [3]:
from llama_index.llms.deepseek import DeepSeek
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# 初始化DeepSeek模型
llm = DeepSeek(model="deepseek-reasoner")

# 提问
response = llm.complete("写个笑话")
print(response)

## 《鱼族锦标赛》

裁判龙虾敲响贝壳："本届海洋运动会增设新项目——陆地障碍赛！"

章鱼选手八条触手倒腾得比风火轮还快，水母撑着伞膜飘过跨栏，寄居蟹驮着海螺壳在沙地上碾出火星子。

只剩小金鱼呆立在起跑线。

"您倒是出发呀？"龙虾挥动旗子催促。

小金鱼腮帮鼓成气球："不是说...比赛用车自备吗？"它盯着岸边生锈的共享单车，鱼鳍在半空比划了半天蹬车动作。


Agent代理：让LLM具备工具使用能力

每个案例都可以独立运行，展示了不同的LlamaIndex Agent功能：
案例1：展示了如何创建Agent
案例2：展示了为Agent添加功能强大的工具
案例3：展示了状态管理和持久化
案例4：展示了如何创建和使用访问上下文的自定义工具
案例5：展示了流式输出和事件处理机制
案例6：展示了如何实现人类在环中的确认机制
案例7：展示了如何构建一个完整的多Agent协作系统

1. 创建一个基础Agent

In [4]:
import nest_asyncio
nest_asyncio.apply()  # 解决异步嵌套的问题

import asyncio
from dotenv import load_dotenv
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import AgentWorkflow

# 加载环境变量
load_dotenv()

# 定义工具函数
def multiply(a: float, b: float) -> float:
    """将两个数相乘并返回乘积"""
    return a * b

def add(a: float, b: float) -> float:
    """将两个数相加并返回和"""
    return a + b

# 初始化LLM
llm = OpenAI(model="gpt-4o-mini")

# 创建代理工作流
workflow = AgentWorkflow.from_tools_or_functions(
    [multiply, add],
    llm=llm,
    system_prompt="你是一个能够执行基本数学运算的代理。"
)

# 运行代理
async def main():
    response = await workflow.run(user_msg="计算20+(2*4)的结果是多少?")
    print(response)

if __name__ == "__main__":
    asyncio.run(main())

20 + (2 * 4) 的结果是 28。


2. 添加更多功能强大的工具
我们可以为代理添加更多工具，例如Yahoo金融工具和搜索工具：

In [5]:
import os
from dotenv import load_dotenv
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.tools.yahoo_finance import YahooFinanceToolSpec
from llama_index.tools.tavily_research import TavilyToolSpec

# 加载环境变量
load_dotenv()

# 初始化LLM
llm = OpenAI(model="gpt-4o-mini")

# 初始化搜索工具
tavily_tool_spec = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY"))
tavily_tool = tavily_tool_spec.to_tool_list()

# 初始化金融工具
finance_tools = YahooFinanceToolSpec().to_tool_list()

# 组合所有工具
tools = finance_tools + [multiply, add] + tavily_tool

# 创建代理工作流
workflow = AgentWorkflow.from_tools_or_functions(
    tools,
    llm=llm,
    system_prompt="你是一个能够执行基本数学运算和获取股票信息的代理。"
)

# 运行代理
async def main():
    response = await workflow.run(user_msg="NVIDIA当前的股价是多少?")
    print(response)

if __name__ == "__main__":
    asyncio.run(main())

目前无法获取NVIDIA的股价信息。请稍后再试或检查其他来源。


3. 状态跟踪与维护
在实际应用中，我们经常需要跟踪代理的状态。LlamaIndex提供了Context类来维护状态：

```python
from llama_index.core.workflow import Context, JsonSerializer

# 创建上下文对象
ctx = Context(workflow) 

async def main():
    # 第一次运行，设置状态
    response = await workflow.run(user_msg="你好，我叫小明!", ctx=ctx)
    print(response)

    # 第二次运行，利用之前的状态
    response2 = await workflow.run(user_msg="我叫什么名字?", ctx=ctx)
    print(response2)

    # 可以将上下文转换为字典并保存
    ctx_dict = ctx.to_dict(serializer=JsonSerializer())
    
    # 之后可以从字典恢复上下文
    restored_ctx = Context.from_dict(workflow, ctx_dict, serializer=JsonSerializer())
    
    # 使用恢复的上下文继续对话
    response3 = await workflow.run(user_msg="我叫什么名字?", ctx=restored_ctx)
    print(response3)
```

In [6]:
# 案例3：状态跟踪与维护
import os
import json
import asyncio
from dotenv import load_dotenv
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.workflow import Context, JsonSerializer

# 加载环境变量
load_dotenv()

# 初始化LLM
llm = OpenAI(model="gpt-4o-mini")

# 创建工作流
workflow = AgentWorkflow.from_tools_or_functions(
    [],  # 这里可以不需要工具，因为我们只是测试状态管理
    llm=llm,
    system_prompt="你是一个能够记住用户信息的助手。",
    initial_state={"user_info": {}}
)

async def test_state_management():
    # 创建上下文对象
    ctx = Context(workflow)
    
    # 第一次运行，设置状态
    response = await workflow.run(user_msg="你好，我叫小明!", ctx=ctx)
    print("第一次回复:", response)

    # 第二次运行，利用之前的状态
    response2 = await workflow.run(user_msg="我叫什么名字?", ctx=ctx)
    print("第二次回复:", response2)

    # 将上下文转换为字典并保存
    ctx_dict = ctx.to_dict(serializer=JsonSerializer())
    
    # 保存状态到文件
    with open("workflow_state.json", "w", encoding="utf-8") as f:
        json.dump(ctx_dict, f, ensure_ascii=False, indent=2)
    
    # 从字典恢复上下文
    restored_ctx = Context.from_dict(workflow, ctx_dict, serializer=JsonSerializer())
    
    # 使用恢复的上下文继续对话
    response3 = await workflow.run(user_msg="我叫什么名字?", ctx=restored_ctx)
    print("恢复状态后回复:", response3)

# 运行测试
if __name__ == "__main__":
    asyncio.run(test_state_management())

第一次回复: 你好，小明！很高兴认识你！有什么我可以帮助你的吗？
第二次回复: 你叫小明！如果你有其他问题或者需要帮助，随时告诉我！
恢复状态后回复: 你叫小明！如果你有其他问题或者需要帮助，随时告诉我！


# 案例4：自定义工具访问上下文变量

```python
from llama_index.core.workflow import Context

async def set_name(ctx: Context, name: str) -> str:
    """设置用户名称的工具"""
    state = await ctx.get("state")  # 获取当前状态
    state["name"] = name  # 在状态中设置名称
    await ctx.set("state", state)  # 更新状态
    return f"Name set to {name}"

# 创建带有初始状态的工作流
workflow = AgentWorkflow.from_tools_or_functions(
    [set_name],
    llm=llm,
    system_prompt="你是一个能够设置和记住名字的助手。",
    initial_state={"name": "未设置"}  # 设置初始状态
)

async def main():
    ctx = Context(workflow)
    
    # 检查名字是否已设置
    response = await workflow.run(user_msg="我的名字是什么?", ctx=ctx)
    print(response)
    
    # 设置名字
    response2 = await workflow.run(user_msg="我的名字是张三", ctx=ctx)
    print(response2)
    
    # 直接从状态中获取名字
    state = await ctx.get("state")
    print(f"状态中存储的名字: {state['name']}")

    # 将状态持久化保存
    import json
    with open("user_state.json", "w", encoding="utf-8") as f:
        json.dump(ctx.to_dict(serializer=JsonSerializer()), f, ensure_ascii=False)

```

In [8]:
# 案例4：自定义工具访问上下文变量
import os
import json
import asyncio
from dotenv import load_dotenv
from datetime import datetime
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.workflow import Context

# 加载环境变量
load_dotenv()

# 初始化LLM
llm = OpenAI(model="gpt-4o-mini")

# 定义访问上下文的工具
async def set_user_info(ctx: Context, name: str, age: int) -> str:
    """设置用户信息的工具"""
    state = await ctx.get("state")
    state["user_info"] = {
        "name": name,
        "age": age,
        "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    await ctx.set("state", state)
    return f"已设置用户信息：姓名={name}, 年龄={age}"

async def get_user_info(ctx: Context) -> str:
    """获取用户信息的工具"""
    state = await ctx.get("state")
    if "user_info" in state:
        info = state["user_info"]
        return f"用户信息：姓名={info['name']}, 年龄={info['age']}, 创建时间={info['created_at']}"
    return "未找到用户信息"

# 创建工作流
workflow = AgentWorkflow.from_tools_or_functions(
    [set_user_info, get_user_info],
    llm=llm,
    system_prompt="你是一个能够管理用户信息的助手。",
    initial_state={"user_info": None}
)

async def test_context_tools():
    ctx = Context(workflow)
    
    # 设置用户信息
    response = await workflow.run(
        user_msg="请记录我的信息：我叫张三，今年25岁",
        ctx=ctx
    )
    print("设置信息回复:", response)
    
    # 查询用户信息
    response2 = await workflow.run(
        user_msg="请告诉我我的个人信息",
        ctx=ctx
    )
    print("查询信息回复:", response2)

# 运行测试
if __name__ == "__main__":
    asyncio.run(test_context_tools())

设置信息回复: 您的信息已成功记录：姓名是张三，年龄是25岁。


Task exception was never retrieved
future: <Task finished name='Task-172' coro=<Workflow.run.<locals>._run_workflow() done, defined at d:\Anacanda3\envs\pytorch_cuda12_0_py310\lib\site-packages\llama_index\core\workflow\workflow.py:343> exception=InvalidStateError('invalid state')>
Traceback (most recent call last):
  File "d:\Anacanda3\envs\pytorch_cuda12_0_py310\lib\site-packages\llama_index\core\workflow\workflow.py", line 403, in _run_workflow
    result.set_result(ctx._retval)
asyncio.exceptions.InvalidStateError: invalid state

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "d:\Anacanda3\envs\pytorch_cuda12_0_py310\lib\asyncio\tasks.py", line 232, in __step
    result = coro.send(None)
  File "d:\Anacanda3\envs\pytorch_cuda12_0_py310\lib\site-packages\llama_index\core\workflow\workflow.py", line 405, in _run_workflow
    result.set_exception(e)
asyncio.exceptions.InvalidStateError: invalid state


查询信息回复: 您的个人信息是：姓名张三，年龄25岁，创建时间为2025年4月21日 18:17:46。


# 案例5：流式输出和事件处理

在实际应用中，尤其是长时间运行的任务，用户需要实时反馈。LlamaIndex提供了流式处理功能：

```python
from llama_index.core.agent.workflow import (
    AgentInput, AgentOutput, ToolCall, ToolCallResult, AgentStream
)

async def main():
    # 创建一个流式处理的运行句柄
    handler = workflow.run(user_msg="查询上海明天的天气")
    
    # 处理流式输出
    async for event in handler.stream_events():
        if isinstance(event, AgentStream):
            # 显示增量输出
            print(event.delta, end="", flush=True)
        elif isinstance(event, AgentInput):
            # 显示输入信息
            print(f"\n代理输入: {event.input}")
            print(f"当前代理: {event.current_agent_name}")
        elif isinstance(event, AgentOutput):
            # 显示输出信息
            print(f"\n代理输出: {event.response}")
            print(f"工具调用: {event.tool_calls}")
        elif isinstance(event, ToolCallResult):
            # 显示工具调用结果
            print(f"\n调用工具: {event.tool_name}")
            print(f"工具参数: {event.tool_kwargs}")
            print(f"工具输出: {event.tool_output}")
    
    # 获取最终输出
    print(f"\n最终回答: {await handler}")
```


In [9]:
import os
import asyncio
from dotenv import load_dotenv
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.agent.workflow import (
    AgentInput, AgentOutput, ToolCall, ToolCallResult, AgentStream
)
from llama_index.tools.tavily_research import TavilyToolSpec

# 加载环境变量
load_dotenv()

# 初始化LLM和搜索工具
llm = OpenAI(model="gpt-4o-mini")
tavily_tool_spec = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY"))
tavily_tool = tavily_tool_spec.to_tool_list()

# 创建工作流
workflow = AgentWorkflow.from_tools_or_functions(
    tavily_tool,
    llm=llm,
    system_prompt="你是一个能够搜索和总结信息的助手。"
)

async def stream_handler():
    # 创建一个流式处理的运行句柄
    handler = workflow.run(user_msg="请帮我查询最新的人工智能发展趋势")
    
    # 处理流式输出
    print("开始处理查询...\n")
    async for event in handler.stream_events():
        if isinstance(event, AgentStream):
            # 显示增量输出
            print(event.delta, end="", flush=True)
        elif isinstance(event, AgentInput):
            # 显示输入信息
            print(f"\n\n代理输入: {event.input}")
            print(f"当前代理: {event.current_agent_name}")
        elif isinstance(event, AgentOutput):
            # 显示输出信息
            print(f"\n\n代理输出: {event.response}")
            if event.tool_calls:
                print(f"工具调用: {event.tool_calls}")
        elif isinstance(event, ToolCallResult):
            # 显示工具调用结果
            print(f"\n\n调用工具: {event.tool_name}")
            print(f"工具参数: {event.tool_kwargs}")
            print(f"工具输出: {event.tool_output}")
    
    # 获取最终输出
    final_response = await handler
    print(f"\n\n最终回答: {final_response}")

# 运行测试
if __name__ == "__main__":
    asyncio.run(stream_handler())

开始处理查询...



代理输入: [ChatMessage(role=<MessageRole.SYSTEM: 'system'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='你是一个能够搜索和总结信息的助手。')]), ChatMessage(role=<MessageRole.USER: 'user'>, additional_kwargs={}, blocks=[TextBlock(block_type='text', text='请帮我查询最新的人工智能发展趋势')])]
当前代理: Agent


代理输出: assistant: None
工具调用: [ToolSelection(tool_id='call_GQsMbWNpTzPSQnvjnDL68Hav', tool_name='search', tool_kwargs={'query': '最新的人工智能发展趋势', 'max_results': 5})]


调用工具: search
工具参数: {'query': '最新的人工智能发展趋势', 'max_results': 5}
工具输出: [Document(id_='013b4033-26c4-470e-b89d-1fe9cb71e344', embedding=None, metadata={'url': 'https://wallstreetcn.com/articles/3738684'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, metadata_template='{key}: {value}', metadata_separator='\n', text_resource=MediaResource(embeddings=None, data=None, text='Published Time: 2025-01-08T11:26:53.000Z 一文读懂2025年十大AI技术趋势 - 华尔街见闻 首页 资讯 快讯 行情 日历 APP VIP会员 大师课 生活家 登录 / 注册 收藏 一文读懂2025年十大AI技术趋势

# 案例6：人类在环中（Human-in-the-loop）

某些情况下，我们需要人类干预来确认关键决策。LlamaIndex支持在工作流中加入人类确认步骤：

```python
from llama_index.core.workflow import (
    InputRequiredEvent, HumanResponseEvent
)

# 创建一个需要人类确认的危险操作工具
async def dangerous_task(ctx: Context) -> str:
    """执行需要人类确认的危险任务"""
    
    # 发送需要用户输入的事件
    ctx.write_event_to_stream(
        InputRequiredEvent(
            prefix="您确定要执行这个危险操作吗？请输入'是'或'否': ",
            user_name="用户"
        )
    )
    
    # 等待人类响应事件
    response = await ctx.wait_for_event(
        HumanResponseEvent, requirements={"user_name": "用户"}
    )
    
    # 基于用户输入决定是否执行任务
    if response.response.strip().lower() in ["是", "yes"]:
        return "危险任务已成功完成。"
    else:
        return "危险任务已取消。"

# 创建工作流
workflow = AgentWorkflow.from_tools_or_functions(
    [dangerous_task],
    llm=llm,
    system_prompt="你是一个可以执行危险任务的助手，但需要先获得用户确认。"
)

async def main():
    # 创建处理程序
    handler = workflow.run(user_msg="执行那个危险任务")
    
    # 处理流式事件
    async for event in handler.stream_events():
        if isinstance(event, InputRequiredEvent):
            # 显示请求确认的消息
            print(event.prefix)
            
            # 获取用户输入
            user_input = input()
            
            # 发送人类响应事件
            handler.add_event(
                HumanResponseEvent(response=user_input, user_name="用户")
            )
        elif isinstance(event, AgentStream):
            print(event.delta, end="", flush=True)
    
    # 获取最终结果
    print(f"\n结果: {await handler}")
```

In [11]:
# 案例6：人类在环中（Human-in-the-loop）
import os
import asyncio
from typing import Optional
from dotenv import load_dotenv
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.workflow import (
    Context, InputRequiredEvent, HumanResponseEvent
)

# 加载环境变量
load_dotenv()

# 初始化LLM
llm = OpenAI(model="gpt-4o-mini")

class OperationConfirmationManager:
    def __init__(self):
        self.pending_response: Optional[str] = None

    def set_response(self, response: str):
        self.pending_response = response

    def get_response(self) -> Optional[str]:
        response = self.pending_response
        self.pending_response = None
        return response

# 创建一个全局的确认管理器
confirmation_manager = OperationConfirmationManager()

# 定义需要人类确认的危险操作工具
async def dangerous_task(ctx: Context, operation: str) -> str:
    """执行需要人类确认的危险任务"""
    
    # 发送需要用户输入的事件
    ctx.write_event_to_stream(
        InputRequiredEvent(
            prefix=f"警告：您正准备执行以下操作：\n{operation}\n\n这是一个危险操作，需要人工确认。\n请输入'确认'以继续，或输入其他内容取消: ",
            user_name="操作员"
        )
    )
    
    # 等待用户输入
    while True:
        response = confirmation_manager.get_response()
        if response is not None:
            break
        await asyncio.sleep(0.1)
    
    # 基于用户输入决定是否执行任务
    if response.strip() == "确认":
        # 在实际应用中，这里会执行真正的操作
        return f"已执行操作：{operation}"
    else:
        return f"操作已取消：{operation}"

async def backup_database(ctx: Context) -> str:
    """备份数据库的工具"""
    return await dangerous_task(ctx, "备份生产数据库")

async def deploy_production(ctx: Context) -> str:
    """部署到生产环境的工具"""
    return await dangerous_task(ctx, "部署代码到生产环境")

# 创建工作流
workflow = AgentWorkflow.from_tools_or_functions(
    [backup_database, deploy_production],
    llm=llm,
    system_prompt="你是一个运维助手，可以执行各种运维操作，但需要人工确认危险操作。"
)

async def test_human_in_loop():
    ctx = Context(workflow)
    
    print("=== 测试人类在环中的工作流 ===\n")
    
    # 测试数据库备份
    print("测试数据库备份操作...")
    handler = workflow.run(
        user_msg="请帮我备份生产数据库",
        ctx=ctx
    )
    
    async for event in handler.stream_events():
        if isinstance(event, InputRequiredEvent):
            print(event.prefix)
            user_input = input()
            confirmation_manager.set_response(user_input)
        elif isinstance(event, AgentStream):
            print(event.delta, end="", flush=True)
    
    result = await handler
    print(f"\n执行结果: {result}\n")
    
    print("\n=== 测试生产部署 ===\n")
    
    # 测试生产部署
    print("测试生产部署操作...")
    handler = workflow.run(
        user_msg="请部署最新代码到生产环境",
        ctx=ctx
    )
    
    async for event in handler.stream_events():
        if isinstance(event, InputRequiredEvent):
            print(event.prefix)
            user_input = input()
            confirmation_manager.set_response(user_input)
        elif isinstance(event, AgentStream):
            print(event.delta, end="", flush=True)
    
    result = await handler
    print(f"\n执行结果: {result}")

# 运行测试
if __name__ == "__main__":
    try:
        asyncio.run(test_human_in_loop())
    except KeyboardInterrupt:
        print("\n程序被用户中断")
    except Exception as e:
        print(f"\n发生错误: {str(e)}")

=== 测试人类在环中的工作流 ===

测试数据库备份操作...
警告：您正准备执行以下操作：
备份生产数据库

这是一个危险操作，需要人工确认。
请输入'确认'以继续，或输入其他内容取消: 
生产数据库已成功备份。
执行结果: 生产数据库已成功备份。


=== 测试生产部署 ===

测试生产部署操作...
部署到生产环境是一个危险操作，请确认您要继续进行此操作。请回复“确认”以继续。
执行结果: 部署到生产环境是一个危险操作，请确认您要继续进行此操作。请回复“确认”以继续。


# 案例7：多Agent系统

对于复杂任务，我们可以构建多个专业Agent协同工作的系统：

```python
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.workflow import Workflow, StepBuilder
from typing import Dict, Any

# 创建专门的分析师Agent
async def create_analyst_agent():
    analyst_tools = YahooFinanceToolSpec().to_tool_list()
    return AgentWorkflow.from_tools_or_functions(
        analyst_tools,
        llm=OpenAI(model="gpt-4o-mini"),
        system_prompt="你是一个金融分析专家，擅长分析股票数据和市场趋势。"
    )

# 创建专门的研究Agent
async def create_researcher_agent():
    research_tools = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY")).to_tool_list()
    return AgentWorkflow.from_tools_or_functions(
        research_tools,
        llm=OpenAI(model="gpt-4o-mini"),
        system_prompt="你是一个研究专家，擅长查找和总结最新的新闻和信息。"
    )

# 创建专门的报告生成Agent
async def create_report_agent():
    return AgentWorkflow.from_tools_or_functions(
        [],
        llm=OpenAI(model="gpt-4o-mini"),
        system_prompt="你是一个报告撰写专家，擅长整合信息并生成结构化的报告。"
    )

# 构建多Agent工作流
async def build_multi_agent_workflow():
    # 创建各个专业Agent
    analyst = await create_analyst_agent()
    researcher = await create_researcher_agent()
    report_generator = await create_report_agent()
    
    # 定义工作流步骤构建器
    step_builder = StepBuilder()
    
    # 添加股票分析步骤
    async def analyze_stock(ctx: Context, stock_symbol: str) -> Dict[str, Any]:
        analysis = await analyst.run(
            user_msg=f"分析股票 {stock_symbol} 的当前数据，包括价格、市值和主要财务指标。",
            ctx=ctx
        )
        return {"analysis": str(analysis)}
    
    # 添加新闻研究步骤
    async def research_news(ctx: Context, stock_symbol: str) -> Dict[str, Any]:
        news = await researcher.run(
            user_msg=f"查找关于 {stock_symbol} 公司最近一周的重要新闻和事件。",
            ctx=ctx
        )
        return {"news": str(news)}
    
    # 添加报告生成步骤
    async def generate_report(ctx: Context, analysis: str, news: str) -> str:
        report = await report_generator.run(
            user_msg=f"基于以下信息生成一份综合投资报告：\n\n股票分析:\n{analysis}\n\n相关新闻:\n{news}",
            ctx=ctx
        )
        return str(report)
    
    # 构建工作流
    workflow = Workflow()
    
    # 定义工作流步骤
    analyze = step_builder.add_step(analyze_stock)
    research = step_builder.add_step(research_news)
    report = step_builder.add_step(
        generate_report,
        analysis=step_builder.get_value(analyze, "analysis"),
        news=step_builder.get_value(research, "news")
    )
    
    # 设置工作流入口和出口
    workflow.set_entry_point(analyze)
    workflow.set_output(report)
    
    return workflow

# 运行多Agent工作流
async def main():
    workflow = await build_multi_agent_workflow()
    result = await workflow.run(stock_symbol="NVDA")
    print(result)
```

In [13]:
# 案例7：多Agent系统
import os
import asyncio
from datetime import datetime
from typing import Dict, Any
from dotenv import load_dotenv
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.workflow import Workflow, Context
from llama_index.tools.yahoo_finance import YahooFinanceToolSpec
from llama_index.tools.tavily_research import TavilyToolSpec

# 加载环境变量
load_dotenv()

class StockAnalysisSystem:
    def __init__(self):
        self.llm = OpenAI(model="gpt-4o-mini")
        
    async def create_analyst_agent(self):
        """创建金融分析师Agent"""
        analyst_tools = YahooFinanceToolSpec().to_tool_list()
        return AgentWorkflow.from_tools_or_functions(
            analyst_tools,
            llm=self.llm,
            system_prompt="你是一个金融分析专家，擅长分析股票数据和市场趋势。"
        )

    async def create_researcher_agent(self):
        """创建市场研究员Agent"""
        research_tools = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY")).to_tool_list()
        return AgentWorkflow.from_tools_or_functions(
            research_tools,
            llm=self.llm,
            system_prompt="你是一个研究专家，擅长查找和总结最新的新闻和信息。"
        )

    async def create_report_agent(self):
        """创建报告撰写Agent"""
        return AgentWorkflow.from_tools_or_functions(
            [],  # 这个Agent不需要特殊工具，主要负责整合信息
            llm=self.llm,
            system_prompt="你是一个报告撰写专家，擅长整合信息并生成结构化的投资报告。"
        )

    async def analyze_stock(self, stock_symbol: str) -> Dict[str, Any]:
        """执行股票分析"""
        try:
            # 创建分析师Agent
            analyst = await self.create_analyst_agent()
            ctx = Context(analyst)
            
            # 获取股票分析
            analysis = await analyst.run(
                user_msg=f"请对{stock_symbol}股票进行深入分析，包括：\n"
                        f"1. 当前股价和市值\n"
                        f"2. 主要财务指标\n"
                        f"3. 技术面分析\n"
                        f"4. 估值水平评估",
                ctx=ctx
            )
            
            return {"analysis": str(analysis)}
        except Exception as e:
            print(f"股票分析出错: {str(e)}")
            return {"analysis": f"分析失败: {str(e)}"}

    async def research_market(self, stock_symbol: str) -> Dict[str, Any]:
        """执行市场研究"""
        try:
            # 创建研究员Agent
            researcher = await self.create_researcher_agent()
            ctx = Context(researcher)
            
            # 获取市场研究
            research = await researcher.run(
                user_msg=f"请收集和分析{stock_symbol}的以下信息：\n"
                        f"1. 最近一周的重要新闻\n"
                        f"2. 行业发展动态\n"
                        f"3. 竞争对手动态\n"
                        f"4. 市场分析师评价",
                ctx=ctx
            )
            
            return {"market_research": str(research)}
        except Exception as e:
            print(f"市场研究出错: {str(e)}")
            return {"market_research": f"研究失败: {str(e)}"}

    async def generate_final_report(
        self, 
        stock_symbol: str,
        analysis: str,
        market_research: str
    ) -> str:
        """生成最终报告"""
        try:
            # 创建报告生成Agent
            report_generator = await self.create_report_agent()
            ctx = Context(report_generator)
            
            # 生成报告
            report = await report_generator.run(
                user_msg=f"""
                请基于以下信息，生成一份完整的投资分析报告：

                === 基本信息 ===
                股票代码：{stock_symbol}
                分析时间：{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

                === 量化分析 ===
                {analysis}

                === 市场研究 ===
                {market_research}

                请按照以下结构组织报告：
                1. 执行摘要
                2. 公司基本面分析
                3. 财务分析
                4. 技术面分析
                5. 行业与竞争分析
                6. 风险因素
                7. 投资建议
                """,
                ctx=ctx
            )
            
            return str(report)
        except Exception as e:
            print(f"报告生成出错: {str(e)}")
            return f"报告生成失败: {str(e)}"

    async def run_analysis(self, stock_symbol: str) -> str:
        """运行完整的分析流程"""
        print(f"\n开始分析股票 {stock_symbol}...")
        
        print("\n1. 执行量化分析...")
        analysis_result = await self.analyze_stock(stock_symbol)
        
        print("\n2. 进行市场研究...")
        research_result = await self.research_market(stock_symbol)
        
        print("\n3. 生成综合报告...")
        final_report = await self.generate_final_report(
            stock_symbol,
            analysis_result["analysis"],
            research_result["market_research"]
        )
        
        return final_report

async def test_stock_analysis():
    # 创建股票分析系统
    system = StockAnalysisSystem()
    
    try:
        # 分析特斯拉股票
        stock_symbol = "TSLA"
        print(f"\n=== 开始分析 {stock_symbol} ===")
        
        # 运行分析
        report = await system.run_analysis(stock_symbol)
        
        # 保存报告
        report_file = f"stock_analysis_{stock_symbol}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
        with open(report_file, "w", encoding="utf-8") as f:
            f.write(report)
        
        print(f"\n分析报告已生成并保存到 {report_file}")
        print("\n报告预览:")
        print("=" * 50)
        print(report[:1000] + "...\n")  # 只显示前1000个字符
        print("=" * 50)
        
    except Exception as e:
        print(f"\n分析过程中发生错误: {str(e)}")

# 运行测试
if __name__ == "__main__":
    try:
        asyncio.run(test_stock_analysis())
    except KeyboardInterrupt:
        print("\n程序被用户中断")
    except Exception as e:
        print(f"\n程序运行出错: {str(e)}")


=== 开始分析 TSLA ===

开始分析股票 TSLA...

1. 执行量化分析...

2. 进行市场研究...

3. 生成综合报告...

分析报告已生成并保存到 stock_analysis_TSLA_20250421_183542.txt

报告预览:
# 特斯拉（TSLA）投资分析报告

**分析时间**：2025-04-21

---

## 1. 执行摘要

特斯拉（TSLA）在电动车市场的领导地位面临着日益激烈的竞争，尤其是来自BYD、福特和现代等公司的挑战。尽管公司在AI无人驾驶技术方面的投资可能为其未来增长提供动力，但近期股价的显著下跌（自历史高点下跌50%）和品牌形象的受损使得投资者对其未来表现持谨慎态度。市场分析师普遍将TSLA评级为“持有”，并预计其股价有一定的上涨空间。

---

## 2. 公司基本面分析

特斯拉作为全球电动车市场的先锋，致力于推动可持续交通的发展。公司在电动车、能源存储和太阳能产品方面的创新使其在行业中保持领先。然而，随着竞争对手的崛起，特斯拉的市场份额受到威胁，尤其是在加州市场。

---

## 3. 财务分析

目前无法获取TSLA的最新财务数据，包括收入、净利润、毛利率、净利率和每股收益（EPS）。建议投资者关注即将发布的财报，以评估公司的财务健康状况和盈利能力。

---

## 4. 技术面分析

由于缺乏最新的股价数据，无法进行具体的技术面分析。然而，投资者可以通过以下方法进行分析：
- **趋势线**：绘制股价趋势线以判断当前趋势。
- **技术指标**：使用移动平均线（MA）、相对强弱指数（RSI）和MACD等指标进行分析。
- **支撑和阻力位**：识别关键的支撑和阻力水平，以制定交易策略。

---

## 5. 行业与竞争分析

### 行业发展动态
- 电动车市场竞争加剧，特斯拉在加州的市场份额受到现代、起亚和福特等公司的侵蚀。
- BYD推出的新型电池充电技术可能会进一步影响特斯拉的市场地位。

### 竞争对手动态
- BYD在电动车销售上超越特斯拉，并推出了与特斯拉的全自动驾驶功能相媲美的“上帝之眼”驾驶辅助系统。
- 福特、现代和本田等品牌在加州的电动车销售均有显著增长，显示出特斯拉的市场统治地位正在减弱。

---

## 6. 风险因素

- **市场竞争加剧**：

实际案例：构建一个信息助手
下面我们将构建一个简单的信息助手，它可以回答问题并自动搜索网络获取最新信息：

In [3]:
import nest_asyncio
nest_asyncio.apply()

import os
import asyncio
from dotenv import load_dotenv
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.tools.tavily_research import TavilyToolSpec
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader

# 加载环境变量
load_dotenv()

# 初始化LLM
llm = OpenAI(model="gpt-4o-mini")

# 加载本地知识库
documents = SimpleDirectoryReader(input_dir="./data/llamaindex_data/").load_data()
knowledge_index = VectorStoreIndex.from_documents(documents)
knowledge_engine = knowledge_index.as_query_engine()

# 将查询引擎转换为工具函数
def query_knowledge_base(query: str) -> str:
    """查询本地知识库获取信息"""
    response = knowledge_engine.query(query)
    return str(response)

# 初始化网络搜索工具
tavily_tool_spec = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY"))
search_tools = tavily_tool_spec.to_tool_list()

# 组合所有工具
tools = [query_knowledge_base] + search_tools

# 创建代理工作流
workflow = AgentWorkflow.from_tools_or_functions(
    tools,
    llm=llm,
    system_prompt="你是一个智能助手，可以回答用户的问题。对于存在于本地知识库的信息，优先使用query_knowledge_base工具；对于需要最新信息的问题，使用搜索工具查询。"
)

# 运行代理
async def main():
    while True:
        user_query = input("请输入您的问题(输入'退出'结束): ")
        if user_query.lower() == '退出':
            break
            
        response = await workflow.run(user_msg=user_query)
        print(f"回答: {response}")

if __name__ == "__main__":
    asyncio.run(main())

回答: DeepSeek V3 共有 6710 亿个参数，其中每个 token 激活了 370 亿个参数。
回答: 今天英伟达（NVIDIA，股票代码：NVDA）的股价为129.84美元，较前一交易日上涨了0.90%。在过去24小时内，股价波动范围为127.60至130.37美元。当前市值约为3.18万亿美元。

如果你想查看更详细的股市信息，可以访问 [TradingView](https://cn.tradingview.com/symbols/NASDAQ-NVDA/) 或 [Yahoo Finance](https://hk.finance.yahoo.com/quote/NVDA/)。
