# Examples of Structured Data Extraction in LlamaIndex

<a href="https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/docs/examples/structured_outputs/structured_outputs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

If you haven't yet read our [structured data extraction tutorial](../../understanding/extraction/index.md), we recommend starting there. This notebook demonstrates some of the techniques introduced in the tutorial.

We start with the simple syntax around LLMs, then move on to how to use it with higher-level modules like a query engine and agent.

A lot of the underlying behavior around structured outputs is powered by our Pydantic Program modules. Check out our [in-depth structured outputs guide](https://docs.llamaindex.ai/en/stable/module_guides/querying/structured_outputs/) for more details.

In [None]:
import nest_asyncio

nest_asyncio.apply()

In [None]:
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core import Settings
from dotenv import load_dotenv
import os

# 加载 .env 文件中的环境变量
load_dotenv(
    dotenv_path="/Users/wingzheng/Desktop/github/ParseDoc/llama_index/.env",
    override=True,
)

# 从环境变量中获取端点和密钥
api_endpoint = os.getenv("BLENDAPI_API_ENDPOINT")
api_key = os.getenv("BLENDAPI_API_KEY")

print(f"\nDEBUG--BLENDAPI_API_ENDPOINT is: {api_endpoint}")
print(f"DEBUG--BLENDAPI_API_KEY  is: {api_key }")

# 初始化 OpenAI 实例，指定自定义端点和密钥
llm = OpenAI(
    model="gpt-4.1",  # 与 curl 命令中一致的模型
    api_base=api_endpoint,  # 使用自定义端点
    api_key=api_key,  # 使用自定义密钥
)

# llm = OpenAI(model="gpt-4o")
embed_model = OpenAIEmbedding(model="text-embedding-3-small")
Settings.llm = llm
Settings.embed_model = embed_model


DEBUG--BLENDAPI_API_ENDPOINT is: https://api.blendapi.com/v1
DEBUG--BLENDAPI_API_KEY  is: sk-pfmYBqPlpvRFuxpLaKKVLqvLb1uY8hkv9i68N7aoO2zmH2Wx


## 1. Simple Structured Extraction

You can convert any LLM to a "structured LLM" by attaching an output class to it through `as_structured_llm`.

Here we pass a simple `Album` class which contains a list of songs. We can then use the normal LLM endpoints like chat/complete.

**NOTE**: async is supported but streaming is coming soon.

In [None]:
from typing import List
from pydantic import BaseModel, Field


class Song(BaseModel):
    """Data model for a song."""

    title: str
    length_seconds: int


class Album(BaseModel):
    """Data model for an album."""

    name: str
    artist: str
    songs: List[Song]

将用户输入封装为 ChatMessage 对象的价值在于：

统一接口：
llama_index 的 chat 方法（如 sllm.chat）要求输入是 ChatMessage 列表。这种标准化格式确保框架能够一致地处理各种输入（文本、图像、多轮对话）。
例如，sllm.chat([input_msg]) 期望一个 ChatMessage 列表，直接传入字符串会导致错误。
支持多轮对话：
ChatMessage 支持多轮对话场景，通过列表 [msg1, msg2, ...] 传递对话历史。例如：

messages = [
    ChatMessage.from_str("What's The Shining about?", role=MessageRole.USER),
    ChatMessage.from_str("Generate an example album from The Shining", role=MessageRole.USER)
]

output = sllm.chat(messages)

上述例子中，LLM 可以基于对话历史生成更准确的专辑输出。

多模态扩展：
ChatMessage 的 blocks 字段支持文本、图像等内容。例如：

input_msg = ChatMessage(
    role=MessageRole.USER,
    blocks=[
        TextBlock(text="Generate an album inspired by this image"),
        ImageBlock(image=base64_image)
    ]
)

这种设计允许将来处理更复杂的输入，而不仅仅是文本。

与结构化输出的衔接：
在整体流程中，input_msg 是从用户输入到结构化输出（Album 对象）的桥梁。ChatMessage 的标准化格式确保 LLM 能正确解析提示，并生成符合 Album 结构的输出。

总结：封装的作用和价值

作用：
ChatMessage.from_str 将用户输入的字符串封装为 ChatMessage 对象，符合 llama_index 的消息格式，包含角色（USER）、内容（TextBlock）和扩展字段（additional_kwargs）。

价值：
提供标准化的输入接口，确保与 llama_index 框架兼容。
支持多轮对话和多模态输入，为复杂场景预留扩展空间。
便于调试（通过 __str__ 和 content）和序列化（通过 model_dump）。
衔接用户输入和结构化输出（如 Album），驱动整个流程。


文字描述流程图

核心要点（融入总结）
节点 1：定义数据模型，确保输出结构化，Pydantic 提供类型安全。
节点 2：sllm 桥接 LLM 和 Album 结构，专门生成结构化输出。
节点 3：ChatMessage 提供统一接口，支持多轮对话和多模态输入，触发 assistant: 格式。
节点 4：sllm.chat 结合 sllm 和 input_msg，生成 ChatResponse，包含 Album 实例和 JSON。
节点 5：output.raw 提供强类型的 Album 对象。
节点 6：展示对话格式（assistant: {...}），验证 JSON 输出。
节点 7：展示 Pydantic 对象，便于操作和验证。

协调过程：

sllm 确保 LLM 输出解析为 Album 实例（存于 output.raw）。sllm.chat 封装 Album 为 ChatResponse，其 __str__ 方法序列化为 JSON，添加 assistant: 前缀。sllm 定义 Album 结构，llama_index 对话接口决定最终输出格式。

输出流程：

LLM 生成原始输出（JSON 或文本）。
sllm 解析为 Album 实例（output.raw）。
ChatResponse 序列化 Album 为 JSON，附加 assistant: 前缀。
核心机制：

ChatMessage 触发对话模式，sllm 确保结构化解析，共同生成 assistant: {...} 和 Pydantic 对象。sllm.chat 先解析为 Album，再由 ChatResponse 转为 JSON。

生成依赖：

需要 sllm（定义 Album 结构）和 input_msg（ChatMessage 格式提示），通过 sllm.chat 生成 ChatResponse，包含 Album 实例和 JSON。

输出与逻辑：

sllm 定义 Album 结构，ChatMessage 触发对话模式，sllm.chat 生成 ChatResponse。输出 assistant: {...} 为对话格式，output.raw 为 Album 实例。

In [None]:
from llama_index.core.llms import ChatMessage

sllm = llm.as_structured_llm(output_cls=Album)
input_msg = ChatMessage.from_str("Generate an example album from The Shining")

#### Sync

In [None]:
output = sllm.chat([input_msg])
# get actual object
output_obj = output.raw

In [None]:
print(str(output))
print(output_obj)

assistant: {"name":"Reflections in the Dark","artist":"The Shining","songs":[{"title":"Midnight Echoes","length_seconds":245},{"title":"Haunted Corridors","length_seconds":210},{"title":"Room 237","length_seconds":198},{"title":"Redrum","length_seconds":230},{"title":"Overlook Waltz","length_seconds":185},{"title":"Frozen Maze","length_seconds":222},{"title":"Typewriter's Lament","length_seconds":204},{"title":"Labyrinthine Mind","length_seconds":250},{"title":"Here's Johnny!","length_seconds":190},{"title":"Final Escape","length_seconds":215}]}
name='Reflections in the Dark' artist='The Shining' songs=[Song(title='Midnight Echoes', length_seconds=245), Song(title='Haunted Corridors', length_seconds=210), Song(title='Room 237', length_seconds=198), Song(title='Redrum', length_seconds=230), Song(title='Overlook Waltz', length_seconds=185), Song(title='Frozen Maze', length_seconds=222), Song(title="Typewriter's Lament", length_seconds=204), Song(title='Labyrinthine Mind', length_seconds=

#### Async

In [None]:
output = await sllm.achat([input_msg])
# get actual object
output_obj = output.raw
print(str(output))

#### Streaming

In [None]:
from IPython.display import clear_output
from pprint import pprint

stream_output = sllm.stream_chat([input_msg])
for partial_output in stream_output:
    clear_output(wait=True)
    pprint(partial_output.raw.dict())

output_obj = partial_output.raw
print(str(output))

#### Async Streaming

In [None]:
from IPython.display import clear_output
from pprint import pprint

stream_output = await sllm.astream_chat([input_msg])
async for partial_output in stream_output:
    clear_output(wait=True)
    #     pprint(partial_output.raw.dict())
    pprint(partial_output.raw.model_dump())  # 使用 model_dump
    last_output = partial_output
if last_output:
    # display(last_output.raw.dict())  # 直接显示 Album 字典
    display(last_output.raw.model_dump())  # 使用 model_dump
else:
    print("No output generated")

{'artist': 'The Shining',
 'name': 'Reflections in the Dark',
 'songs': [{'length_seconds': 245, 'title': 'Midnight Echoes'},
           {'length_seconds': 210, 'title': 'Haunted Hallways'},
           {'length_seconds': 198, 'title': 'Room 237'},
           {'length_seconds': 230, 'title': 'Redrum'},
           {'length_seconds': 185, 'title': 'The Overlook Waltz'},
           {'length_seconds': 222, 'title': 'Frozen Maze'},
           {'length_seconds': 250, 'title': "Jack's Descent"},
           {'length_seconds': 205, 'title': "Typewriter's Lament"},
           {'length_seconds': 238, 'title': 'Labyrinthine Shadows'},
           {'length_seconds': 192, 'title': 'Final Glimpse'}]}


{'name': 'Reflections in the Dark',
 'artist': 'The Shining',
 'songs': [{'title': 'Midnight Echoes', 'length_seconds': 245},
  {'title': 'Haunted Hallways', 'length_seconds': 210},
  {'title': 'Room 237', 'length_seconds': 198},
  {'title': 'Redrum', 'length_seconds': 230},
  {'title': 'The Overlook Waltz', 'length_seconds': 185},
  {'title': 'Frozen Maze', 'length_seconds': 222},
  {'title': "Jack's Descent", 'length_seconds': 250},
  {'title': "Typewriter's Lament", 'length_seconds': 205},
  {'title': 'Labyrinthine Shadows', 'length_seconds': 238},
  {'title': 'Final Glimpse', 'length_seconds': 192}]}

### 1.b Use the `structured_predict` Function

Instead of explicitly doing `llm.as_structured_llm(...)`, every LLM class has a `structured_predict` function which allows you to more easily call the LLM with a prompt template + template variables to return a strutured output in one line of code.

In [None]:
# use query pipelines
from llama_index.core.prompts import ChatPromptTemplate
from llama_index.core.llms import ChatMessage
from llama_index.llms.openai import OpenAI

from dotenv import load_dotenv
import os

# 加载 .env 文件中的环境变量
load_dotenv(
    dotenv_path="/Users/wingzheng/Desktop/github/ParseDoc/llama_index/.env",
    override=True,
)

# 从环境变量中获取端点和密钥
api_endpoint = os.getenv("BLENDAPI_API_ENDPOINT")
api_key = os.getenv("BLENDAPI_API_KEY")

print(f"\nDEBUG--BLENDAPI_API_ENDPOINT is: {api_endpoint}")
print(f"DEBUG--BLENDAPI_API_KEY  is: {api_key }")

# 初始化 OpenAI 实例，指定自定义端点和密钥
llm = OpenAI(
    model="gpt-4.1",  # 与 curl 命令中一致的模型
    api_base=api_endpoint,  # 使用自定义端点
    api_key=api_key,  # 使用自定义密钥
)

chat_prompt_tmpl = ChatPromptTemplate(
    message_templates=[
        ChatMessage.from_str(
            "Generate an example album from {movie_name}", role="user"
        )
    ]
)
album = llm.structured_predict(
    Album, chat_prompt_tmpl, movie_name="Lord of the Rings"
)
album


DEBUG--BLENDAPI_API_ENDPOINT is: https://api.blendapi.com/v1
DEBUG--BLENDAPI_API_KEY  is: sk-pfmYBqPlpvRFuxpLaKKVLqvLb1uY8hkv9i68N7aoO2zmH2Wx


Album(name='Echoes of Middle-earth', artist='Lord of the Rings', songs=[Song(title='The Shire Awakens', length_seconds=210), Song(title='Journey to Rivendell', length_seconds=245), Song(title='Mines of Moria', length_seconds=230), Song(title="Lothlórien's Light", length_seconds=200), Song(title='The Council of Elrond', length_seconds=215), Song(title='Shadow of Mordor', length_seconds=250), Song(title='Riders of Rohan', length_seconds=225), Song(title="Gollum's Lament", length_seconds=180), Song(title='The White City', length_seconds=240), Song(title='Mount Doom', length_seconds=260)])

## 2. Plug into RAG Pipeline

You can also plug this into a RAG pipeline. Below we show structured extraction from an Apple 10K report.

In [None]:
!mkdir data
!wget "https://s2.q4cdn.com/470004039/files/doc_financials/2021/q4/_10-K-2021-(As-Filed).pdf" -O data/apple_2021_10k.pdf

--2025-06-04 17:48:44--  https://s2.q4cdn.com/470004039/files/doc_financials/2021/q4/_10-K-2021-(As-Filed).pdf
正在解析主机 s2.q4cdn.com (s2.q4cdn.com)... 199.254.199.61
正在连接 s2.q4cdn.com (s2.q4cdn.com)|199.254.199.61|:443... 已连接。
已发出 HTTP 请求，正在等待回应... 200 OK
长度：789896 (771K) [application/pdf]
正在保存至: “data/apple_2021_10k.pdf”


2025-06-04 17:48:45 (1.16 MB/s) - 已保存 “data/apple_2021_10k.pdf” [789896/789896])



#### Option 1: Use LlamaParse

You will need an account at https://cloud.llamaindex.ai/ and an API Key to use LlamaParse, our document parser for 10K filings.

In [None]:
import sys

print(sys.executable)

/Users/wingzheng/Library/Application Support/hatch/env/virtual/llama-index/MX7fUAaF/llama-index/bin/python


In [None]:
# from llama_parse import LlamaParse

# # os.environ["LLAMA_CLOUD_API_KEY"] = "llx-..."
# orig_docs = LlamaParse(result_type="text").load_data(
#     "./data/apple_2021_10k.pdf"
# )

from llama_parse import LlamaParse
import os
from dotenv import load_dotenv

# 加载 .env 文件中的环境变量
load_dotenv(
    dotenv_path="/Users/wingzheng/Desktop/github/ParseDoc/llama_index/.env",
    override=True,
)

# 从环境变量中获取 LlamaParse 的配置（来自“已知1”）
base_url = os.getenv(
    "LLAMA_CLOUD_US_BASE_URL", "https://api.cloud.llamaindex.ai"
)
api_key = os.getenv("LLAMA_CLOUD_API_KEY")
organization_id = os.getenv("LLAMA_CLOUD_ORGANIZATION_ID", None)
project_name = os.getenv("LLAMA_CLOUD_PROJECT_NAME", "Default")

# 验证环境变量
if not base_url or not base_url.startswith(("http://", "https://")):
    raise ValueError(f"Invalid base_url: {base_url}")
if not api_key:
    raise ValueError("API key is missing")

# 打印调试信息
print(f"\nDEBUG--LLAMA_CLOUD_US_BASE_URL is: {base_url}")
print(f"DEBUG--LLAMA_CLOUD_API_KEY is: {api_key}")
print(f"DEBUG--organization_id is: {organization_id}")
print(f"DEBUG--project_name is: {project_name}\n")

# 初始化 LlamaParse 实例，传入配置参数（结合“已知2”）
parser = LlamaParse(
    api_key=api_key,
    base_url=base_url.rstrip("/"),  # 确保移除末尾斜杠
    # organization_id=organization_id,
    # project_id=project_name,
    result_type="text",
    check_interval=1,
    verbose=True,  # 启用详细日志
)
print(f"DEBUG--Final base_url used by LlamaParse: {parser.base_url}\n")
file_path = os.path.abspath("./data/apple_2021_10k.pdf")
if not os.path.exists(file_path):
    raise FileNotFoundError(f"File {file_path} does not exist")

# 加载 PDF 文件（来自“已知3”）
orig_docs = parser.load_data(file_path)
# orig_docs = LlamaParse(result_type="text").load_data(
#     "./data/apple_2021_10k.pdf"
# )

# 返回加载的文档
orig_docs


DEBUG--LLAMA_CLOUD_US_BASE_URL is: https://api.cloud.llamaindex.ai
DEBUG--LLAMA_CLOUD_API_KEY is: llx-qm4E2vTT3LUNgPYiSdu9xDhGUtNEC5QYXyJCodDofkYLwJfY
DEBUG--organization_id is: a2280180-d52d-4688-8f61-97a46cdfd38e
DEBUG--project_name is: Default

DEBUG--Final base_url used by LlamaParse: https://api.cloud.llamaindex.ai

DEBUG--Entering _create_job method
DEBUG--Constructed URL: /api/parsing/upload
DEBUG--API Response: 200 {"id":"2f80a77c-f6dc-453b-9fb7-e61423a5940a","status":"PENDING"}
Started parsing the file under job_id 2f80a77c-f6dc-453b-9fb7-e61423a5940a


[Document(id_='b5ac4851-0b05-4c76-b925-8d66d1c55925', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, metadata_template='{key}: {value}', metadata_separator='\n', text_resource=MediaResource(embeddings=None, data=None, text='                                                                         UNITED STATES\n                                                               SECURITIES AND EXCHANGE COMMISSION\n                                                                     Washington, D.C. 20549\n                                                                           FORM 10-K\n(Mark One)\n     ☒   ANNUAL REPORT PURSUANT TO SECTION 13 OR 15(d) OF THE SECURITIES EXCHANGE ACT OF 1934\n                                                          For the fiscal year ended September 25, 2021\n                                                                               or\n    ☐   TRANSITION REPORT PURSUANT TO SECTION 13 OR 

In [None]:
from copy import deepcopy
from llama_index.core.schema import TextNode


def get_page_nodes(docs, separator="\n---\n"):
    """Split each document into page node, by separator."""
    nodes = []
    for doc in docs:
        doc_chunks = doc.text.split(separator)
        for doc_chunk in doc_chunks:
            node = TextNode(
                text=doc_chunk,
                metadata=deepcopy(doc.metadata),
            )
            nodes.append(node)

    return nodes


docs = get_page_nodes(orig_docs)
print(docs[0].get_content())

#### Option 2: Use SimpleDirectoryReader

You can also choose to use the free PDF parser bundled into our `SimpleDirectoryReader`.

In [None]:
# # OPTION 2: Use SimpleDirectoryReader
# from llama_index.core import SimpleDirectoryReader

# reader = SimpleDirectoryReader(input_files=["apple_2021_10k.pdf"])
# docs = reader.load_data()

#### Build RAG Pipeline, Define Structured Output Schema

We build a RAG pipeline with our trusty VectorStoreIndex and reranker module. We then define the output as a Pydantic model. This allows us to create a structured LLM with the output class attached.

In [None]:
from llama_index.core import VectorStoreIndex

# skip chunking since we're doing page-level chunking
index = VectorStoreIndex(docs)

In [None]:
from llama_index.postprocessor.flag_embedding_reranker import (
    FlagEmbeddingReranker,
)

reranker = FlagEmbeddingReranker(
    top_n=5,
    model="BAAI/bge-reranker-large",
)

In [None]:
from pydantic import BaseModel, Field
from typing import List


class Output(BaseModel):
    """Output containing the response, page numbers, and confidence."""

    response: str = Field(..., description="The answer to the question.")
    page_numbers: List[int] = Field(
        ...,
        description="The page numbers of the sources used to answer this question. Do not include a page number if the context is irrelevant.",
    )
    confidence: float = Field(
        ...,
        description="Confidence value between 0-1 of the correctness of the result.",
    )
    confidence_explanation: str = Field(
        ..., description="Explanation for the confidence score"
    )


sllm = llm.as_structured_llm(output_cls=Output)

#### Run Queries

In [None]:
query_engine = index.as_query_engine(
    similarity_top_k=5,
    node_postprocessors=[reranker],
    llm=sllm,
    response_mode="tree_summarize",  # you can also select other modes like `compact`, `refine`
)

In [None]:
response = query_engine.query("Net sales for each product category in 2021")
print(str(response))

In [None]:
response.response.dict()