# Runnable を理解したい

In [1]:
from langchain_core.runnables import (
    Runnable,
    RunnableAssign,
    RunnableBinding,
    RunnableBranch,
    RunnableConfig,
    RunnableGenerator,
    RunnableLambda,
    RunnableParallel,
    RunnablePassthrough,
    RunnablePick,
    RunnableWithFallbacks,
)
from typing import Any, Iterator
import asyncio


### 入力をそのまま後続へ渡す（`RunnablePassthrough`）

In [2]:
chain = RunnablePassthrough()

chain.invoke("foobar")

'foobar'

### `RunnableLambda`

In [3]:
chain = RunnableLambda(lambda x: x * 2)

chain.invoke(1)

2

`RunnableConfig`を受け取る。

In [4]:
# 第二引数で受け取る
# configという名前でなければいけない
chain = RunnableLambda(lambda _, config: config)

chain.invoke(None)

{'tags': [],
 'metadata': {},
 'callbacks': <langchain_core.callbacks.manager.CallbackManager at 0x113846190>,
 'recursion_limit': 25}

出力に足す。

In [5]:
chain = (
    RunnableLambda(lambda x: { "x": x })
        .assign(y=RunnableLambda(lambda _, config: config["configurable"]["y"]))
)

chain.invoke(1, {
    "configurable": {
        "y": 2
    }
})

{'x': 1, 'y': 2}

`Runnable`を返しても良いみたい。

In [6]:
chain = RunnableLambda(lambda _: RunnablePassthrough())

chain.invoke("hello")

'hello'

### 複数のRunnableを並行に処理してdictを出力する（`RunnableParallel`）

In [7]:
def twice(a: int) -> int:
    return a * 2

chain = RunnableParallel(
    origin=RunnablePassthrough(),
    increment=lambda x: x+1,
    twice=twice,
)

(
    chain.invoke(10),
    type(chain.steps["increment"]), # ラムダ式は RunnableLambda になる
    type(chain.steps["twice"]), # 関数も RunnableLambda になる
)

({'origin': 10, 'increment': 11, 'twice': 20},
 langchain_core.runnables.base.RunnableLambda,
 langchain_core.runnables.base.RunnableLambda)

### シーケンス（`RunnableSequence`）

In [8]:
# a | b | c といったチェーンは RunnableSequence になる
# a | { "b": b, "c": c } といったチェーンのdict部分は RunnableParallel になる
chain = (
    RunnableLambda(lambda a: f"{a}bar")
    | {
        "foo": RunnablePassthrough(),
        "bar": RunnablePassthrough(),
    }
    | RunnablePassthrough()
)

(
    chain.invoke("foo"),
    type(chain),
    type(chain.steps[0]),
)

({'foo': 'foobar', 'bar': 'foobar'},
 langchain_core.runnables.base.RunnableSequence,
 langchain_core.runnables.base.RunnableLambda)

### 条件分岐

`RunnableBranch`を使うのかと思いきや、`RunnableLambda`で分岐ロジックを書くことが推奨されているみたい。

- https://python.langchain.com/docs/expression_language/how_to/routing

> 1. Conditionally return runnables from a RunnableLambda (recommended)
> 2. Using a RunnableBranch.

In [9]:
def branch(a: Any) -> Any:
    if isinstance(a, str):
        return a.upper()
    if isinstance(a, int):
        return a * 2
    return a

chain = RunnableLambda(branch)

(
    chain.invoke("foo"),
    chain.invoke(123),
    chain.invoke(["bar"]),
)

('FOO', 246, ['bar'])

In [10]:
chain = RunnableBranch(
    (lambda a: isinstance(a, str), lambda a: a.upper()),
    (lambda a: isinstance(a, int), lambda a: a * 2),
    RunnablePassthrough())

(
    chain.invoke("foo"),
    chain.invoke(123),
    chain.invoke(["bar"]),
)

('FOO', 246, ['bar'])

### ジェネレーターから`Runnable`を構築（`RunnableGenerator`）

In [11]:
def generator(input: Any) -> Iterator[str]:
    for a in ["foo", "bar", "baz", "qux"]:
        yield a

chain = RunnableGenerator(generator)

chain.invoke(None)

'foobarbazqux'

`RunnableConfig`も受け取れるっぽい。

In [12]:
def generator(input: Any, config: RunnableConfig):
    yield config["configurable"]

chain = RunnableGenerator(generator)

chain.invoke(None, {
    "configurable": {
        "message": "foobar"
    }
})

{'message': 'foobar'}

### 辞書から特定のキーを取得する（`RunnablePick`）

In [13]:
chain = RunnablePick("foo")

chain.invoke({
    "foo": "hello",
    "bar": "world",
    "baz": "!!"
})

'hello'

In [14]:
chain = RunnablePick(["foo", "baz"])

chain.invoke({
    "foo": "hello",
    "bar": "world",
    "baz": "!!"
})

{'foo': 'hello', 'baz': '!!'}

`RunnablePick`を明示的に使うよりも`pick`メソッドを使用するのかも。

In [15]:
chain = RunnablePassthrough().pick("bar")

chain.invoke({
    "foo": "hello",
    "bar": "world",
    "baz": "!!"
})

'world'

### 処理が失敗したときにフォールバックする（`RunnableWithFallbacks`）

In [16]:
runnable = RunnableLambda(lambda a: 100 / a)
fallback = RunnableLambda(lambda a: f"Fallback: {a}")

chain = RunnableWithFallbacks(runnable=runnable, fallbacks=[fallback])

chain.batch([1, 2, 100, 0, "hello"])

[100.0, 50.0, 1.0, 'Fallback: 0', 'Fallback: hello']

`RunnableWithFallbacks`を明示的に使うよりも`with_fallbacks`メソッドを使用するのかも。

In [17]:
chain = RunnableLambda(lambda a: 100 / a).with_fallbacks([
    RunnableLambda(lambda a: f"Fallback: {a}")
])

chain.batch([1, 2, 100, 0, "hello"])

[100.0, 50.0, 1.0, 'Fallback: 0', 'Fallback: hello']

### `Runnable`のサブクラスを一覧する

In [18]:
def get_all_subclasses(cls):
    all_subclasses = []
    direct_subclasses = cls.__subclasses__()    
    for subclass in direct_subclasses:
        all_subclasses.append(subclass)
        all_subclasses.extend(get_all_subclasses(subclass))
    return all_subclasses

all_subclasses = get_all_subclasses(Runnable)
for subclass in all_subclasses:
    print(subclass)


<class 'langchain_core.runnables.base.RunnableSerializable'>
<class 'langchain_core.runnables.base.RunnableSequence'>
<class 'langchain_core.runnables.base.RunnableParallel'>
<class 'langchain_core.runnables.base.RunnableEachBase'>
<class 'langchain_core.runnables.base.RunnableEach'>
<class 'langchain_core.runnables.base.RunnableBindingBase'>
<class 'langchain_core.runnables.base.RunnableBinding'>
<class 'langchain_core.runnables.branch.RunnableBranch'>
<class 'langchain_core.runnables.fallbacks.RunnableWithFallbacks'>
<class 'langchain_core.runnables.passthrough.RunnablePassthrough'>
<class 'langchain_core.runnables.passthrough.RunnableAssign'>
<class 'langchain_core.runnables.passthrough.RunnablePick'>
<class 'langchain_core.runnables.router.RouterRunnable'>
<class 'langchain_core.beta.runnables.context.ContextGet'>
<class 'langchain_core.beta.runnables.context.ContextSet'>
<class 'langchain_core.runnables.base.RunnableGenerator'>
<class 'langchain_core.runnables.base.RunnableLambda'