In [2]:
from langchain_core.runnables.base import Runnable

https://github.com/langchain-ai/langchain/blob/d77c7c4236df8e56fbe3acc8e0a71b57b48f1678/libs/core/langchain_core/runnables/base.py#L1335



Runnable(Generic[Input, Output], ABC):
    """A unit of work that can be invoked, batched, streamed, transformed and composed."""
- invoke/ainvoke: Transforms a single input into an output.
- batch/abatch: Efficiently transforms multiple inputs into outputs.
- stream/astream: Streams output from a single input as it’s produced.
- async
- bind
- RunnableSequence invokes a series of runnables sequentially, with one Runnable’s output serving as the next’s input. Construct using the | operator or by passing a list of runnables to RunnableSequence.
- RunnableParallel invokes runnables concurrently, providing the same input to each. Construct it using a dict literal within a sequence or by passing a dict to RunnableParallel.
    

    

# Inherit Runnable class and invoke or  batch of a  runnable

In [3]:
class AddTwoNum(Runnable):    
    def invoke(self, x:int, y:int)->int:
        return x + y
add_two_num = AddTwoNum()
print('add_two_num:', add_two_num.invoke(2, 2))

class MultiplyNum(Runnable):
    def invoke(self, x):
        return x * 10
multiply_num = MultiplyNum()
print('multiply_num:', multiply_num.invoke(2))


add_two_num: 4
multiply_num: 20


In [20]:
multiply_num.batch([1, 2, 3])

[10, 20, 30]

In [21]:
await multiply_num.abatch([1, 2, 3])

[10, 20, 30]

# Multiple aruguments in the function

In [4]:
# def pipe returns  RunnableSequence( *others) 
# runnable_1 = RunnableLambda(add_one)
# runnable_2 = RunnableLambda(mul_two)
# sequence = runnable_1 | runnable_2
# # Or equivalently:
# # sequence = RunnableSequence(first=runnable_1, last=runnable_2)
# sequence.invoke(1)

' The serail op fails due to misatch in the invoke signature' 

serial_ops = add_two_num.pipe(multiply_num) 
try:
    serial_ops.invoke(1, 1)
except Exception as e:
    print(e)

'int' object has no attribute 'items'


In [5]:
### Fails again , the signature has config and **kwargs available but fist one have two positional param and second on has one positinal param
class AddTwoNum(Runnable):    
    def invoke(self, x:int, y:int, config=None, **kwargs)->int:
        return x + y
add_two_num = AddTwoNum()
print('add_two_num:', add_two_num.invoke(2, 2))

class MultiplyNum(Runnable):
    def invoke(self, x, config=None, **kwargs):
        return x * 10
multiply_num = MultiplyNum()
print('multiply_num:', multiply_num.invoke(2))
serial_ops = add_two_num.pipe(multiply_num) 
try:
    serial_ops.invoke(1, 1)
except Exception as e:
    print(e)

add_two_num: 4
multiply_num: 20
'int' object has no attribute 'items'


In [6]:
from langchain_core.globals import set_debug
set_debug(True)
from langchain_core.tracers import ConsoleCallbackHandler

### Now it works with one positinal argnment for both runnables
class AddTwoNum(Runnable):    
    def invoke(self, inputs, config=None, **kwargs):
        x, y = inputs
        return x + y
add_two_num = AddTwoNum()
input_data = (2, 2)
print('add_two_num:', add_two_num.invoke(input_data))
class MultiplyNum(Runnable):
    def invoke(self, x, config=None, **kwargs):
        return x * 10
multiply_num = MultiplyNum()
print('multiply_num:', multiply_num.invoke(2))
serial_ops = add_two_num.pipe(multiply_num) 
print('multiply_num:', serial_ops.invoke(
                                        input_data,
                                        config={'callbacks': [ConsoleCallbackHandler()]}) )

add_two_num: 4
multiply_num: 20
[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence] Entering Chain run with input:
[0m{
  "input": [
    2,
    2
  ]
}
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableSequence] [0ms] Exiting Chain run with output:
[0m{
  "output": 40
}
multiply_num: 40


In [7]:
set_debug(True)
try:
    print('multiply_num:', await serial_ops.ainvoke(
                                            input_data,
                                            # config={'callbacks': [ConsoleCallbackHandler()]}
    ) 
         )
except:
    pass

[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence] Entering Chain run with input:
[0m{
  "input": [
    2,
    2
  ]
}
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableSequence] [2ms] Exiting Chain run with output:
[0m{
  "output": 40
}
multiply_num: 40


In [8]:
class AddTwoNum(Runnable):    
    def invoke(self, inputs, config=None, **kwargs):
        x, y = inputs
        return x + y
        
    async def ainvoke(self, inputs, config=None, **kwargs):
        x, y = inputs
        return x + y
add_two_num = AddTwoNum()
input_data = (2, 2)
print('add_two_num:', await add_two_num.ainvoke(input_data))
class MultiplyNum(Runnable):
    def invoke(self, x, config=None, **kwargs):
        return x * 10

    async def ainvoke(self, x, config=None, **kwargs):
        return x * 10
        
multiply_num = MultiplyNum()
print('multiply_num:', await multiply_num.ainvoke(2))
serial_ops = add_two_num.pipe(multiply_num) 
print('multiply_num:', await serial_ops.ainvoke(
                                        input_data,
                                        # config={'callbacks': [ConsoleCallbackHandler()]}
) )

add_two_num: 4
multiply_num: 20
[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence] Entering Chain run with input:
[0m{
  "input": [
    2,
    2
  ]
}
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableSequence] [1ms] Exiting Chain run with output:
[0m{
  "output": 40
}
multiply_num: 40


# How about parallel

In [25]:
from langchain_core.runnables import RunnableParallel
parallel_ops = RunnableParallel({
    "add_two_num": add_two_num,
    "multiply_num": multiply_num  ### This will be input_data * 10 => [2, 2] * 10 => 20 number of 2 
})

In [26]:
parallel_ops.invoke(input_data)

[32;1m[1;3m[chain/start][0m [1m[chain:RunnableParallel<add_two_num,multiply_num>] Entering Chain run with input:
[0m{
  "input": [
    2,
    2
  ]
}
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableParallel<add_two_num,multiply_num>] [1ms] Exiting Chain run with output:
[0m{
  "add_two_num": 4,
  "multiply_num": [
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2,
    2
  ]
}


{'add_two_num': 4,
 'multiply_num': (2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2)}

# parallel with Bind

In [31]:
class MultiplyNum(Runnable):
    def invoke(self, x, config=None, **kwargs):
        y = kwargs.get('multiplier', 10)
        return x * y
        
multiply_num = MultiplyNum()
parallel_ops = RunnableParallel({
    "multiply_by_10": multiply_num.bind(multiplier=10),
    "multiply_by_100": multiply_num.bind(multiplier=100)  
})
parallel_ops.invoke(39)

[32;1m[1;3m[chain/start][0m [1m[chain:RunnableParallel<multiply_by_10,multiply_by_100>] Entering Chain run with input:
[0m{
  "input": 39
}
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableParallel<multiply_by_10,multiply_by_100>] [2ms] Exiting Chain run with output:
[0m{
  "multiply_by_10": 390,
  "multiply_by_100": 3900
}


{'multiply_by_10': 390, 'multiply_by_100': 3900}

# Chain serial and parallel

In [32]:
# serial_n_parallel_chain =  add_two_num | multiply_num | parallel_ops
serial_n_parallel_chain =  add_two_num | parallel_ops
serial_n_parallel_chain.invoke(input_data)

[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence] Entering Chain run with input:
[0m{
  "input": [
    2,
    2
  ]
}
[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence > chain:RunnableParallel<multiply_by_10,multiply_by_100>] Entering Chain run with input:
[0m{
  "input": 4
}
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableSequence > chain:RunnableParallel<multiply_by_10,multiply_by_100>] [1ms] Exiting Chain run with output:
[0m{
  "multiply_by_10": 40,
  "multiply_by_100": 400
}
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableSequence] [3ms] Exiting Chain run with output:
[0m{
  "multiply_by_10": 40,
  "multiply_by_100": 400
}


{'multiply_by_10': 40, 'multiply_by_100': 400}

# Runnable Lambda 

In [47]:
from langchain_core.runnables import RunnableLambda
transform_to_tuple = RunnableLambda(lambda x: (x, x))
print('tranform_to_tuple:', transform_to_tuple.invoke(2))

[32;1m[1;3m[chain/start][0m [1m[chain:RunnableLambda] Entering Chain run with input:
[0m{
  "input": 2
}
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableLambda] [0ms] Exiting Chain run with output:
[0m{
  "output": [
    2,
    2
  ]
}
tranform_to_tuple: (2, 2)


# Inline transformation through RunnableLambda  before RunnableParallel

In [36]:
from langchain_core.runnables import RunnableParallel

parallel_ops = RunnableParallel({
    "add_two_num": add_two_num,
    "multiply_num": multiply_num  ### This will be input_data * 10 => [2, 2] * 10 => 20 number of 2 
})

In [37]:
serial_n_parallel_chain =  add_two_num | parallel_ops
try:
    serial_n_parallel_chain.invoke(input_data)
except:
    pass

[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence] Entering Chain run with input:
[0m{
  "input": [
    2,
    2
  ]
}
[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence > chain:RunnableParallel<add_two_num,multiply_num>] Entering Chain run with input:
[0m{
  "input": 4
}
[31;1m[1;3m[chain/error][0m [1m[chain:RunnableSequence > chain:RunnableParallel<add_two_num,multiply_num>] [3ms] Chain run errored with error:
[0m"TypeError('cannot unpack non-iterable int object')Traceback (most recent call last):\n\n\n  File \"/mnt/d/myDev/llmapps/venv/lib/python3.9/site-packages/langchain_core/runnables/base.py\", line 3562, in invoke\n    output = {key: future.result() for key, future in zip(steps, futures)}\n\n\n  File \"/mnt/d/myDev/llmapps/venv/lib/python3.9/site-packages/langchain_core/runnables/base.py\", line 3562, in <dictcomp>\n    output = {key: future.result() for key, future in zip(steps, futures)}\n\n\n  File \"/home/bhujay/anaconda3/lib/python3.9/concurrent/fut

In [45]:
transform_n_add = transform_to_tuple | add_two_num
print('transform_n_add:', transform_n_add.invoke(2))

[32;1m[1;3m[chain/start][0m [1m[chain:RunnableLambda] Entering Chain run with input:
[0m{
  "input": 2
}
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableLambda] [0ms] Exiting Chain run with output:
[0m{
  "output": [
    2,
    2
  ]
}
tranform_to_tuple: (2, 2)
[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence] Entering Chain run with input:
[0m{
  "input": 2
}
[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence > chain:RunnableLambda] Entering Chain run with input:
[0m{
  "input": 2
}
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableSequence > chain:RunnableLambda] [0ms] Exiting Chain run with output:
[0m{
  "output": [
    2,
    2
  ]
}
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableSequence] [1ms] Exiting Chain run with output:
[0m{
  "output": 4
}
transform_n_add: 4


In [49]:
set_debug(False)
transformed_parallel_ops = RunnableParallel({
    "add_two_num": transform_n_add,
    "multiply_num": multiply_num  ### This will be input_data * 10 => [2, 2] * 10 => 20 number of 2 
})
serial_transoform_parallel_chain =  add_two_num | transformed_parallel_ops
serial_transoform_parallel_chain.invoke(input_data)

{'add_two_num': 8, 'multiply_num': 40}

# RunnablePassthrough 

In [62]:
# A function becomes Runnable by passing through RunnablePassthrough
chain_rp = add_2x | RunnablePassthrough()
chain_rp.invoke(10)

30

In [63]:
# Runnable to passthrough inputs unchanged or with additional keys.
# This Runnable behaves almost like the identity function, except that it can be configured to add additional keys to the output, if the input is a dict.
from langchain_core.runnables import RunnablePassthrough
def add_2x(x):
    return x + 2 * x
rp = RunnablePassthrough(add_2x)
rp.invoke(10)

10

In [78]:
### Assign an additional key when the input to RunnablePassthrough is a dict
mydict = {"add_2x": add_2x }
chain_rp = mydict| RunnablePassthrough().assign(add_100=lambda dict_inpt: dict_inpt['add_2x'] + 100)
chain_rp.invoke(10)

{'add_2x': 30, 'add_100': 130}

In [82]:
### keeping the original input at different stage of the chain
transformed_parallel_ops = RunnableParallel({
    "input_data": RunnablePassthrough(),
    "add_two_num": transform_n_add,
    "input_to_multiply": RunnablePassthrough(), ### get the 
    "multiply_num": multiply_num,  
    
})
serial_transoform_parallel_chain =  add_two_num | transformed_parallel_ops
serial_transoform_parallel_chain.invoke(input_data)

{'input_data': 4, 'add_two_num': 8, 'input_to_multiply': 4, 'multiply_num': 40}

# More on Runnable Class and Bind 

In [9]:
import langchain_core

In [10]:
from langchain_core.runnables.base import Runnable
from typing import Any, Dict, Optional, Type

class TextTransformationRunnable(Runnable[str, str]):
    """
    A Runnable that transforms input text by appending a specified suffix.
    """
    def __init__(self, suffix: str):
        super().__init__()
        self.suffix = suffix

    def invoke(self, input: str, config: Dict[str, Any] = None) -> str:
        """
        Appends the suffix to the input text.
        """
        return input + self.suffix

    def batch(self, inputs: list[str], config: Dict[str, Any] = None) -> list[str]:
        """
        Processes a batch of inputs by appending the suffix to each input text.
        """
        return [self.invoke(input_text, config) for input_text in inputs]

# Usage example
suffix_adder = TextTransformationRunnable(" world!")

# Single invocation
result = suffix_adder.invoke("Hello")
print(result)  # Outputs: Hello world!

# Batch invocation
batch_result = suffix_adder.batch(["Hello", "Goodbye"])
print(batch_result)  # Outputs: ['Hello world!', 'Goodbye world!']


Hello world!
['Hello world!', 'Goodbye world!']


In [83]:
from langchain_core.runnables import RunnableLambda
import random

def add_one(x:int)->int:
    return x + 1

def buggy_double(y:int)->int:
    'buggy code that will fail 70% of the time'
    if random.random() > 0.3:
        print('this code failed due to a bug')
        raise ValueError('Buggy code triggerred')
    return y * 2

runnable_add_one = RunnableLambda(add_one)
print('runnable_add_one:', runnable_add_one.invoke(2))
runnable_buggy_double = RunnableLambda(buggy_double).with_retry(
                            stop_after_attempt=10,
                            wait_exponential_jitter=False)
try:
    print('runnable_buggy_double:', runnable_buggy_double.invoke(3)) 
except:
    pass

runnable_add_one: 3
runnable_buggy_double: 6


In [84]:
sequence = runnable_add_one.pipe(runnable_buggy_double) ## equivallent 
sequence.invoke(1)

this code failed due to a bug


4

In [85]:
set_debug(False)
sequence = runnable_add_one | (runnable_buggy_double) ## equivallent 
sequence.invoke(1)

this code failed due to a bug
this code failed due to a bug
this code failed due to a bug
this code failed due to a bug


4

In [86]:
await sequence.ainvoke(1)

this code failed due to a bug
this code failed due to a bug
this code failed due to a bug


4

In [87]:
set_debug(False)
await sequence.abatch((1, 2))

this code failed due to a bugthis code failed due to a bug

this code failed due to a bug
this code failed due to a bug
this code failed due to a bug


[4, 6]

In [88]:
set_debug(False)
parallel_ops =  RunnableLambda(lambda x: x + 1) | {
    'mul_2': RunnableLambda(lambda x: x * 2),
    'mul_5': RunnableLambda(lambda x: x * 5)
}
parallel_ops.invoke(1)

{'mul_2': 4, 'mul_5': 10}

 the bind() method in Langchain serves a few key purposes:
• It allows you to bind arguments to a Runnable, returning a new Runnable. This is useful when a Runnable in a chain requires an argument that is not in the output of the previous Runnable or included in the user input.0• It can be used to bind model-specific tools or default invocation arguments to a Runnable. This allows you to configure the Runnable with certain parameters that will be used each time it is invoked, without having to pass those parameters in manually. [2,3]
• When working with chat models, the bind_tools() method can be used to handle converting custom tool schemas (like the ones used by OpenAI) and binding them to the model, so the model can call those tools as part of its response. [4,5]
In summary, the bind() method in Langchain provides a way to configure a Runnable with additional parameters or functionality, without having to pass those in manually each time the Runnable is used. This helps make Runnables more reusable and composable within larger chains or workflows.

In [17]:
from typing import Any, Dict, Optional, Type
from langchain_core.runnables import Runnable

class TextProcessorRunnable(Runnable[str, str]):
    def __init__(self, to_uppercase: bool = False) -> None:
        self.to_uppercase = to_uppercase

    def invoke(self, input: str, config: Optional[Dict[str, Any]] = None, **kwargs: Any) -> str:
        # Merge the instance-level and kwargs-level configuration
        to_uppercase = kwargs.get('to_uppercase', self.to_uppercase)
        
        # Process the input text
        if to_uppercase:
            return input.upper()
        return input

    @property
    def InputType(self) -> Type[str]:
        return str

    @property
    def OutputType(self) -> Type[str]:
        return str


In [18]:
# Create an instance of TextProcessorRunnable without binding
processor = TextProcessorRunnable()

# Invoke the processor without binding any arguments
output = processor.invoke("Hello World")
print(output)  # Output: "Hello World"

# Now, use the bind method to bind the 'to_uppercase' argument
bound_processor = processor.bind(to_uppercase=True)

# Invoke the bound processor
bound_output = bound_processor.invoke("Hello World")
print(bound_output)  # Output: "HELLO WORLD"


Hello World
HELLO WORLD


In [19]:
''' is being used as a placeholder. If tools is meant to hold a configuration or a set of functionalities that are yet to be implemented or configured, using ... is a way to indicate that it's intentionally left incomplete for the moment. '''
tools = ...  
tools

Ellipsis

#  STREAM 

- this was left out in previous exercise - we have to complete it 
- stream from Runnable should be able to go to a jquery 