# 如何运行自定义函数
:::info 前提条件
本指南假定您已熟悉以下概念：- [LangChain 表达式语言 (LCEL)](/docs/concepts/lcel)- [链式运行](/docs/how_to/sequence/)
:::
你可以将任意函数用作 [Runnables](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable)。这在需要格式化或实现其他 LangChain 组件未提供的功能时非常有用，而作为 Runnables 使用的自定义函数被称为 [`RunnableLambdas`](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.RunnableLambda.html)。
请注意，这些函数的所有输入都必须是一个单一参数。如果你的函数接受多个参数，你应该编写一个包装器，该包装器接受一个字典输入并将其解包为多个参数。
本指南将涵盖：
- 如何通过`RunnableLambda`构造函数及便捷的`@chain`装饰器显式地从自定义函数创建可运行对象- 在链式调用中将自定义函数强制转换为可运行对象- 如何在自定义函数中接收和使用运行元数据- 如何通过让自定义函数返回生成器来实现流式传输
## 使用构造函数
下面，我们明确使用 `RunnableLambda` 构造函数来封装我们的自定义逻辑：

In [None]:
%pip install -qU langchain langchain_openai

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass()

In [2]:
from operator import itemgetter

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI


def length_function(text):
    return len(text)


def _multiple_length_function(text1, text2):
    return len(text1) * len(text2)


def multiple_length_function(_dict):
    return _multiple_length_function(_dict["text1"], _dict["text2"])


model = ChatOpenAI()

prompt = ChatPromptTemplate.from_template("what is {a} + {b}")

chain = (
    {
        "a": itemgetter("foo") | RunnableLambda(length_function),
        "b": {"text1": itemgetter("foo"), "text2": itemgetter("bar")}
        | RunnableLambda(multiple_length_function),
    }
    | prompt
    | model
)

chain.invoke({"foo": "bar", "bar": "gah"})

AIMessage(content='3 + 9 equals 12.', response_metadata={'token_usage': {'completion_tokens': 8, 'prompt_tokens': 14, 'total_tokens': 22}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-73728de3-e483-49e3-ad54-51bd9570e71a-0')

## 便捷的 `@chain` 装饰器
你也可以通过添加 `@chain` 装饰器将任意函数转换为链式结构。这在功能上等同于将函数包裹在如上所示的 `RunnableLambda` 构造函数中。示例如下：

In [3]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import chain

prompt1 = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
prompt2 = ChatPromptTemplate.from_template("What is the subject of this joke: {joke}")


@chain
def custom_chain(text):
    prompt_val1 = prompt1.invoke({"topic": text})
    output1 = ChatOpenAI().invoke(prompt_val1)
    parsed_output1 = StrOutputParser().invoke(output1)
    chain2 = prompt2 | ChatOpenAI() | StrOutputParser()
    return chain2.invoke({"joke": parsed_output1})


custom_chain.invoke("bears")

'The subject of the joke is the bear and his girlfriend.'

在上面的代码中，`@chain`装饰器被用于将`custom_chain`转换为可运行对象，我们通过`.invoke()`方法来调用它。
如果您正在使用 [LangSmith](https://docs.smith.langchain.com/) 的追踪功能，您应该能在其中看到一个名为 `custom_chain` 的追踪记录，其中嵌套了对 OpenAI 的调用。
## 链式操作中的自动强制类型转换
在使用管道操作符（`|`）的链式调用中自定义函数时，可以省略 `RunnableLambda` 或 `@chain` 构造器，直接依赖类型强制转换。以下是一个简单示例：该函数接收模型输出并返回其前五个字母。

In [4]:
prompt = ChatPromptTemplate.from_template("tell me a story about {topic}")

model = ChatOpenAI()

chain_with_coerced_function = prompt | model | (lambda x: x.content[:5])

chain_with_coerced_function.invoke({"topic": "bears"})

'Once '

需要注意的是，我们无需将自定义函数 `(lambda x: x.content[:5])` 封装在 `RunnableLambda` 构造函数中，因为管道操作符左侧的 `model` 本身已是一个 Runnable 对象。该自定义函数会被**强制转换**为可运行对象。更多信息请参阅[此章节](/docs/how_to/sequence/#coercion)。
## 传递运行元数据
可运行的 lambda 函数可以选择性地接受一个 [RunnableConfig](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.config.RunnableConfig.html#langchain_core.runnables.config.RunnableConfig) 参数，用于将回调函数、标签以及其他配置信息传递给嵌套的运行。

In [5]:
import json

from langchain_core.runnables import RunnableConfig


def parse_or_fix(text: str, config: RunnableConfig):
    fixing_chain = (
        ChatPromptTemplate.from_template(
            "Fix the following text:\n\n```text\n{input}\n```\nError: {error}"
            " Don't narrate, just respond with the fixed data."
        )
        | model
        | StrOutputParser()
    )
    for _ in range(3):
        try:
            return json.loads(text)
        except Exception as e:
            text = fixing_chain.invoke({"input": text, "error": e}, config)
    return "Failed to parse"


from langchain_community.callbacks import get_openai_callback

with get_openai_callback() as cb:
    output = RunnableLambda(parse_or_fix).invoke(
        "{foo: bar}", {"tags": ["my-tag"], "callbacks": [cb]}
    )
    print(output)
    print(cb)

{'foo': 'bar'}
Tokens Used: 62
	Prompt Tokens: 56
	Completion Tokens: 6
Successful Requests: 1
Total Cost (USD): $9.6e-05


In [6]:
from langchain_community.callbacks import get_openai_callback

with get_openai_callback() as cb:
    output = RunnableLambda(parse_or_fix).invoke(
        "{foo: bar}", {"tags": ["my-tag"], "callbacks": [cb]}
    )
    print(output)
    print(cb)

{'foo': 'bar'}
Tokens Used: 62
	Prompt Tokens: 56
	Completion Tokens: 6
Successful Requests: 1
Total Cost (USD): $9.6e-05


## 流式传输
:::note[RunnableLambda](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.RunnableLambda.html) 最适合不需要支持流式处理的代码。如果需要支持流式处理（即能够操作输入块并生成输出块），请改用 [RunnableGenerator](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.RunnableGenerator.html)，如下例所示。:::
你可以将生成器函数（即使用 `yield` 关键字、行为类似迭代器的函数）以链式方式调用。
这些生成器的签名应为 `Iterator[Input] -> Iterator[Output]`。对于异步生成器则为：`AsyncIterator[Input] -> AsyncIterator[Output]`。
以下情况适用：- 实现自定义输出解析器- 在保留流式处理能力的同时，修改前一步骤的输出
以下是一个针对逗号分隔列表的自定义输出解析器示例。首先，我们创建一个生成此类文本列表的链：

In [7]:
from typing import Iterator, List

prompt = ChatPromptTemplate.from_template(
    "Write a comma-separated list of 5 animals similar to: {animal}. Do not include numbers"
)

str_chain = prompt | model | StrOutputParser()

for chunk in str_chain.stream({"animal": "bear"}):
    print(chunk, end="", flush=True)

lion, tiger, wolf, gorilla, panda

接下来，我们定义一个自定义函数，该函数将聚合当前流式传输的输出，并在模型生成列表中的下一个逗号时将其输出：

In [8]:
# This is a custom parser that splits an iterator of llm tokens
# into a list of strings separated by commas
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
    # hold partial input until we get a comma
    buffer = ""
    for chunk in input:
        # add current chunk to buffer
        buffer += chunk
        # while there are commas in the buffer
        while "," in buffer:
            # split buffer on comma
            comma_index = buffer.index(",")
            # yield everything before the comma
            yield [buffer[:comma_index].strip()]
            # save the rest for the next iteration
            buffer = buffer[comma_index + 1 :]
    # yield the last chunk
    yield [buffer.strip()]


list_chain = str_chain | split_into_list

for chunk in list_chain.stream({"animal": "bear"}):
    print(chunk, flush=True)

['lion']
['tiger']
['wolf']
['gorilla']
['raccoon']


调用它会返回完整的值数组：

In [9]:
list_chain.invoke({"animal": "bear"})

['lion', 'tiger', 'wolf', 'gorilla', 'raccoon']

## 异步版本
如果你在一个`async`环境中工作，以下是上述示例的`async`版本：

In [10]:
from typing import AsyncIterator


async def asplit_into_list(
    input: AsyncIterator[str],
) -> AsyncIterator[List[str]]:  # async def
    buffer = ""
    async for (
        chunk
    ) in input:  # `input` is a `async_generator` object, so use `async for`
        buffer += chunk
        while "," in buffer:
            comma_index = buffer.index(",")
            yield [buffer[:comma_index].strip()]
            buffer = buffer[comma_index + 1 :]
    yield [buffer.strip()]


list_chain = str_chain | asplit_into_list

async for chunk in list_chain.astream({"animal": "bear"}):
    print(chunk, flush=True)

['lion']
['tiger']
['wolf']
['gorilla']
['panda']


In [11]:
await list_chain.ainvoke({"animal": "bear"})

['lion', 'tiger', 'wolf', 'gorilla', 'panda']

## 后续步骤
现在你已经学会了在链中使用自定义逻辑的几种不同方法，以及如何实现流式处理。
要了解更多信息，请参阅本节中关于可运行项的其他操作指南。