Skip to content

Commit

Permalink
core[minor]: Add an async root listener and with_alisteners method (l…
Browse files Browse the repository at this point in the history
…angchain-ai#22151)

- [x] **Adding AsyncRootListener**: "langchain_core: Adding
AsyncRootListener"

- **Description:** Adding an AsyncBaseTracer, AsyncRootListener and
`with_alistener` function. This is to enable binding async root listener
to runnables. This currently only supported for sync listeners.
- **Issue:** None
- **Dependencies:** None

- [x] **Add tests and docs**: Added units tests and example snippet code
within the function description of `with_alistener`


- [x] **Lint and test**: Run make format_diff, make lint_diff and make
test
  • Loading branch information
nicolasnk authored and JonZeolla committed Jun 11, 2024
1 parent 6bc0ed7 commit 4593498
Show file tree
Hide file tree
Showing 6 changed files with 1,768 additions and 320 deletions.
108 changes: 108 additions & 0 deletions libs/core/langchain_core/runnables/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
RunLog,
RunLogPatch,
)
from langchain_core.tracers.root_listeners import AsyncListener
from langchain_core.tracers.schemas import Run


Expand Down Expand Up @@ -1327,6 +1328,86 @@ def fn_end(run_obj: Run):
],
)

def with_alisteners(
self,
*,
on_start: Optional[AsyncListener] = None,
on_end: Optional[AsyncListener] = None,
on_error: Optional[AsyncListener] = None,
) -> Runnable[Input, Output]:
"""
Bind asynchronous lifecycle listeners to a Runnable, returning a new Runnable.
on_start: Asynchronously called before the runnable starts running.
on_end: Asynchronously called after the runnable finishes running.
on_error: Asynchronously called if the runnable throws an error.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and any tags or metadata
added to the run.
Example:
.. code-block:: python
from langchain_core.runnables import RunnableLambda
import time
async def test_runnable(time_to_sleep : int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj : Runnable):
print(f"on start callback starts at {format_t(time.time())}
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj : Runnable):
print(f"on end callback starts at {format_t(time.time())}
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2024-05-16T14:20:29.637053+00:00
on start callback starts at 2024-05-16T14:20:29.637150+00:00
on start callback ends at 2024-05-16T14:20:32.638305+00:00
on start callback ends at 2024-05-16T14:20:32.638383+00:00
Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00
Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00
Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00
on end callback starts at 2024-05-16T14:20:35.640534+00:00
Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00
on end callback starts at 2024-05-16T14:20:37.640574+00:00
on end callback ends at 2024-05-16T14:20:37.640654+00:00
on end callback ends at 2024-05-16T14:20:39.641751+00:00
"""
from langchain_core.tracers.root_listeners import AsyncRootListenersTracer

return RunnableBinding(
bound=self,
config_factories=[
lambda config: {
"callbacks": [
AsyncRootListenersTracer(
config=config,
on_start=on_start,
on_end=on_end,
on_error=on_error,
)
],
}
],
)

def with_types(
self,
*,
Expand Down Expand Up @@ -4294,6 +4375,33 @@ def with_listeners(
)
)

def with_alisteners(
self,
*,
on_start: Optional[AsyncListener] = None,
on_end: Optional[AsyncListener] = None,
on_error: Optional[AsyncListener] = None,
) -> RunnableEach[Input, Output]:
"""
Bind async lifecycle listeners to a Runnable, returning a new Runnable.
on_start: Called asynchronously before the runnable starts running,
with the Run object.
on_end: Called asynchronously after the runnable finishes running,
with the Run object.
on_error: Called asynchronously if the runnable throws an error,
with the Run object.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and any tags or metadata
added to the run.
"""
return RunnableEach(
bound=self.bound.with_alisteners(
on_start=on_start, on_end=on_end, on_error=on_error
)
)


class RunnableBindingBase(RunnableSerializable[Input, Output]):
"""Runnable that delegates calls to another Runnable with a set of kwargs.
Expand Down
Loading

0 comments on commit 4593498

Please sign in to comment.