# LangChain の記法解説 (LCEL)


In [None]:
from dotenv import load_dotenv

load_dotenv(dotenv_path="../.env", override=True)

## Runnable と RunnableSequence


In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "ユーザーが入力した料理のレシピを考えてください。"),
        ("human", "{dish}"),
    ]
)

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

output_parser = StrOutputParser()

In [None]:
prompt_value = prompt.invoke({"dish": "カレー"})
ai_message = model.invoke(prompt_value)
output = output_parser.invoke(ai_message)

print(output)

In [None]:
chain = prompt | model | output_parser

In [None]:
output = chain.invoke({"dish": "カレー"})
print(output)

### Runnable の実行方法―invoke・stream・batch


In [None]:
chain = prompt | model | output_parser

for chunk in chain.stream({"dish": "カレー"}):
    print(chunk, end="", flush=True)

In [None]:
chain = prompt | model | output_parser

outputs = chain.batch([{"dish": "カレー"}, {"dish": "うどん"}])
print(outputs)

### LCEL の「|」で様々な Runnable を連鎖させる


In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

output_parser = StrOutputParser()

In [None]:
cot_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "ユーザーの質問にステップバイステップで回答してください。"),
        ("human", "{question}"),
    ]
)

cot_chain = cot_prompt | model | output_parser

In [None]:
summarize_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "ステップバイステップで考えた回答から結論だけ抽出してください。"),
        ("human", "{text}"),
    ]
)

summarize_chain = summarize_prompt | model | output_parser

In [None]:
cot_summarize_chain = cot_chain | summarize_chain
ai_message = cot_summarize_chain.invoke({"question": "10 + 2 * 3"})
print(ai_message)

## RunnableLambda―任意の関数を Runnable にする


In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant."),
        ("human", "{input}"),
    ]
)

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

output_parser = StrOutputParser()

In [None]:
from langchain_core.runnables import RunnableLambda


def upper(text: str) -> str:
    return text.upper()


chain = prompt | model | output_parser | RunnableLambda(upper)

ai_message = chain.invoke({"input": "Hello!"})
print(ai_message)

### chain デコレーターを使った RunnableLamda の実装


In [None]:
from langchain_core.runnables import chain


@chain
def upper(text: str) -> str:
    return text.upper()


chain = prompt | model | output_parser | upper

ai_message = chain.invoke({"input": "Hello!"})
print(ai_message)

### RunnableLambda への自動変換


In [None]:
def upper(text: str) -> str:
    return text.upper()


chain = prompt | model | output_parser | upper

In [None]:
ai_message = chain.invoke({"input": "Hello!"})
print(ai_message)

### Runnable の入力の型と出力の型に注意


In [None]:
def upper(text: str) -> str:
    return text.upper()


chain = prompt | model | upper

# 以下のコードを実行するとエラーになります
# output = chain.invoke({"input": "Hello!"})

In [None]:
chain = prompt | model | StrOutputParser() | upper

In [None]:
ai_message = chain.invoke({"input": "Hello!"})

## RunnableParallel―複数の Runnable を並列で処理する


In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
output_parser = StrOutputParser()

In [None]:
optimistic_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "あなたは楽観主義者です。ユーザーの入力に対して楽観的な意見をください。",
        ),
        ("human", "{topic}"),
    ]
)
optimistic_chain = optimistic_prompt | model | output_parser

In [None]:
pessimistic_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "あなたは悲観主義者です。ユーザーの入力に対して悲観的な意見をください。",
        ),
        ("human", "{topic}"),
    ]
)
pessimistic_chain = pessimistic_prompt | model | output_parser

In [None]:
import pprint
from langchain_core.runnables import RunnableParallel

parallel_chain = RunnableParallel(
    {
        "optimistic_opinion": optimistic_chain,
        "pessimistic_opinion": pessimistic_chain,
    }
)

ai_message = parallel_chain.invoke({"topic": "生成AIの進化について"})
pprint.pprint(ai_message)

### RunnableParallel の出力を Runnable の入力に連結する


In [None]:
synthesize_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "あなたは客観的AIです。2つの意見をまとめてください。"),
        (
            "human",
            "楽観的意見: {optimistic_opinion}\n悲観的意見: {pessimistic_opinion}",
        ),
    ]
)

In [None]:
synthesize_chain = (
    RunnableParallel(
        {
            "optimistic_opinion": optimistic_chain,
            "pessimistic_opinion": pessimistic_chain,
        }
    )
    | synthesize_prompt
    | model
    | output_parser
)

ai_message = synthesize_chain.invoke({"topic": "生成AIの進化について"})
print(ai_message)

### RunnableParallel への自動変換


In [None]:
synthesize_chain = (
    {
        "optimistic_opinion": optimistic_chain,
        "pessimistic_opinion": pessimistic_chain,
    }
    | synthesize_prompt
    | model
    | output_parser
)

In [None]:
ai_message = synthesize_chain.invoke({"topic": "生成AIの進化について"})
print(ai_message)

### RunnableLambda との組み合わせ―itemgetter を使う例


In [None]:
from operator import itemgetter

topic_getter = itemgetter("topic")
topic = topic_getter({"topic": "生成AIの進化について"})
print(topic)

In [None]:
from operator import itemgetter

synthesize_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "あなたは客観的AIです。{topic}について2つの意見をまとめてください。",
        ),
        (
            "human",
            "楽観的意見: {optimistic_opinion}\n悲観的意見: {pessimistic_opinion}",
        ),
    ]
)

synthesize_chain = (
    {
        "optimistic_opinion": optimistic_chain,
        "pessimistic_opinion": pessimistic_chain,
        "topic": itemgetter("topic"),
    }
    | synthesize_prompt
    | model
    | output_parser
)

output = synthesize_chain.invoke({"topic": "生成AIの進化について"})
print(output)

## RunnablePassthrough―入力をそのまま出力する


In [None]:
from langchain_chroma import Chroma
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import CharacterTextSplitter

loader = DirectoryLoader(
    path="../tmp/langchain",
    glob="**/*.mdx",
    loader_cls=TextLoader,
)
raw_docs = loader.load()

text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(raw_docs)

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
db = Chroma.from_documents(docs, embeddings)

retriever = db.as_retriever()

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_template('''\
以下の文脈だけを踏まえて質問に回答してください。

文脈: """
{context}
"""

質問: {question}
''')

model = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

output = chain.invoke("LangGraphとは")
print(output)

### assign―RunnableParallel に値を追加する


In [None]:
import pprint

chain = {
    "question": RunnablePassthrough(),
    "context": retriever,
} | RunnablePassthrough.assign(answer=prompt | model | StrOutputParser())

output = chain.invoke("LangGraphとは")
pprint.pprint(output)

In [None]:
from langchain_core.runnables import RunnableParallel

chain = RunnableParallel(
    {
        "question": RunnablePassthrough(),
        "context": retriever,
    }
).assign(answer=prompt | model | StrOutputParser())

In [None]:
import pprint

output = chain.invoke("LangGraphとは")
pprint.pprint(output)

### （補足）astream_events


In [None]:
chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

async for event in chain.astream_events("LangGraphとは", version="v2"):
    print(event, flush=True)

In [None]:
async for event in chain.astream_events("LangGraphとは", version="v2"):
    event_kind = event["event"]

    if event_kind == "on_retriever_end":
        print("=== 検索結果 ===")
        documents = event["data"]["output"]
        for document in documents:
            print(document)

    elif event_kind == "on_parser_start":
        print("=== 最終出力 ===")

    elif event_kind == "on_parser_stream":
        chunk = event["data"]["chunk"]
        print(chunk, end="", flush=True)