In [None]:
from langchain_core.runnables import RunnablePassthrough, RunnableLambda

In [None]:
chain = RunnablePassthrough() | RunnablePassthrough () | RunnablePassthrough ()
chain.invoke("hello")

In [None]:
def input_to_upper(input: str):
    output = input.upper()
    return output

In [None]:
chain = RunnablePassthrough() | RunnableLambda(input_to_upper) | RunnablePassthrough()
chain.invoke("hello")

In [None]:
from typing import Callable, TypeVar, Optional, Union, Any
from typing import Any, Optional
from langchain_core.runnables import Runnable

Input = TypeVar("Input")
Output = TypeVar("Output")

class ConditionalRunnableLambda(Runnable[Input, Input]):
    def __init__(
        self,
        condition: Union[Callable[[Input], bool], bool],
        func: Callable[[Input], Output],
        name: Optional[str] = None,
    ):
        self.condition = condition
        self.func = func
        self.name = name or "ConditionalRunnableLambda"

    def invoke(self, input_value: Input, config: Optional[Any] = None) -> Input:
        print(f"ConditionalRunnableLambda.invoke called with input_value: {input_value}")

        if callable(self.condition):
            condition_met = self.condition(input_value)
        else:
            condition_met = self.condition

        print(f"Condition evaluated to: {condition_met}")

        if condition_met:
            result = self.func(input_value)
            print(f"Condition met. Function result: {result}")
            return result
        else:
            print("Condition not met. Returning input_value unchanged.")
            return input_value

    async def ainvoke(self, input_value: Input, config: Optional[Any] = None) -> Input:
        print(f"ConditionalRunnableLambda.ainvoke called with input_value: {input_value}")

        if callable(self.condition):
            condition_met = self.condition(input_value)
        else:
            condition_met = self.condition

        print(f"Condition evaluated to: {condition_met}")

        if condition_met:
            result = self.func(input_value)
            print(f"Condition met. Function result: {result}")
            return result
        else:
            print("Condition not met. Returning input_value unchanged.")
            return input_value

In [None]:
chain = RunnablePassthrough() | ConditionalRunnableLambda(False, input_to_upper) | RunnablePassthrough()
chain.invoke("hello")

In [None]:
chain = RunnablePassthrough() | ConditionalRunnableLambda(True, input_to_upper) | RunnablePassthrough()
chain.invoke("hello")

In [None]:
from typing import Any, Optional
from langchain_core.runnables import Runnable
import time
from langchain_core.runnables.utils import (
    Input,
    Output
)

class RetryRunnable(Runnable[Input, Output]):
    def __init__(self, runnable: Runnable[Input, Output], retries: int, delay: float = 0.0):
        self.runnable = runnable
        self.retries = retries
        self.delay = delay

    def invoke(self, input_value: Input, config: Optional[Any] = None) -> Output:
        attempts = 0
        while attempts < self.retries:
            try:
                return self.runnable.invoke(input_value, config)
            except Exception as e:
                print(f"Attempt {attempts} failed: {e}")
                attempts += 1
                if attempts >= self.retries:
                    raise e
                if self.delay > 0:
                    time.sleep(self.delay)

In [None]:
import random

def sometimes_fails(input_value: str) -> str:
    if random.random() < 0.5:
        raise ValueError("Random failure occurred")
    return input_value.upper()

retry_runnable = RetryRunnable(RunnableLambda(sometimes_fails), retries=5, delay=1)
try:
    result = retry_runnable.invoke("hello")
    print(f"Result: {result}")
except Exception as e:
    print(f"Failed after retries: {e}")

In [None]:
class FallbackRunnable(Runnable[Input, Output]):
    def __init__(self, primary: Runnable[Input, Output], fallback: Runnable[Input, Output]):
        self.primary = primary
        self.fallback = fallback

    def invoke(self, input_value: Input, config: Optional[Any] = None) -> Output:
        try:
            return self.primary.invoke(input_value, config)
        except Exception:
            return self.fallback.invoke(input_value, config)

fallback_runnable = FallbackRunnable(
    ConditionalRunnableLambda(True, lambda x: 1 / 0),
    RunnableLambda(lambda x: "fallback")
)
fallback_runnable.invoke("hello")

In [15]:
from typing import Any, Optional, Dict
from langchain_core.runnables import Runnable

class CacheRunnable(Runnable):
    def __init__(self, runnable: Runnable):
        self.runnable = runnable
        self.cache: Dict[Any, Any] = {}

    def invoke(self, input_value: Any, config: Optional[Any] = None) -> Any:
        if input_value in self.cache:
            print(f"Cache hit for input: {input_value}")
            return self.cache[input_value]
        print(f"Cache miss for input: {input_value}. Computing result...")
        result = self.runnable.invoke(input_value, config)
        self.cache[input_value] = result
        return result


In [21]:
import time

def time_consuming_computation(input_value: str) -> str:
    print(f"Starting time-consuming computation for input: {input_value}")
    time.sleep(2)
    result = input_value.upper()
    print(f"Finished time-consuming computation for input: {input_value}")
    return result

In [29]:
time_consuming_runnable = RunnableLambda(time_consuming_computation)
cache_runnable = CacheRunnable(time_consuming_runnable)

In [32]:
chain = RunnablePassthrough() | cache_runnable | RunnablePassthrough()

chain.invoke("hello")

Cache miss for input: hello. Computing result...
Starting time-consuming computation for input: hello
Finished time-consuming computation for input: hello


'HELLO'

In [33]:
chain.invoke("hello")

Cache hit for input: hello


'HELLO'