Skip to content

Commit

Permalink
core[patch]: Correctly order parent ids in astream events (from root …
Browse files Browse the repository at this point in the history
…to immediate parent), add defensive check for cycles (langchain-ai#22637)

This PR makes two changes:

1. Fixes the order of parent IDs to be from root to immediate parent
2. Adds a simple defensive check for cycles
  • Loading branch information
eyurtsev authored and JonZeolla committed Jun 11, 2024
1 parent 9e37487 commit f729f65
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 7 deletions.
17 changes: 12 additions & 5 deletions libs/core/langchain_core/tracers/event_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,20 @@ def __init__(
def _get_parent_ids(self, run_id: UUID) -> List[str]:
"""Get the parent IDs of a run (non-recursively) cast to strings."""
parent_ids = []
parent_id = self.parent_map[run_id]

while parent_id is not None:
parent_ids.append(str(parent_id))
parent_id = self.parent_map[parent_id]
while parent_id := self.parent_map.get(run_id):
str_parent_id = str(parent_id)
if str_parent_id in parent_ids:
raise AssertionError(
f"Parent ID {parent_id} is already in the parent_ids list. "
f"This should never happen."
)
parent_ids.append(str_parent_id)
run_id = parent_id

return parent_ids
# Return the parent IDs in reverse order, so that the first
# parent ID is the root and the last ID is the immediate parent.
return parent_ids[::-1]

def _send(self, event: StreamEvent, event_type: str) -> None:
"""Send an event to the stream."""
Expand Down
40 changes: 38 additions & 2 deletions libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2033,8 +2033,8 @@ async def parent(x: str, config: RunnableConfig) -> str:
"metadata": {},
"name": "grandchild",
"parent_ids": [
"00000000-0000-0000-0000-000000000008",
"00000000-0000-0000-0000-000000000007",
"00000000-0000-0000-0000-000000000008",
],
"run_id": "00000000-0000-0000-0000-000000000009",
"tags": [],
Expand All @@ -2045,8 +2045,8 @@ async def parent(x: str, config: RunnableConfig) -> str:
"metadata": {},
"name": "grandchild",
"parent_ids": [
"00000000-0000-0000-0000-000000000008",
"00000000-0000-0000-0000-000000000007",
"00000000-0000-0000-0000-000000000008",
],
"run_id": "00000000-0000-0000-0000-000000000009",
"tags": [],
Expand Down Expand Up @@ -2081,6 +2081,42 @@ async def parent(x: str, config: RunnableConfig) -> str:
]


async def test_bad_parent_ids() -> None:
"""Test handling of situation where a run id is duplicated in the run tree."""

# Type ignores in the code below need to be investigated.
# Looks like a typing issue when using RunnableLambda as a decorator
# with async functions.
@RunnableLambda # type: ignore
async def child(x: str) -> str:
return x

@RunnableLambda # type: ignore
async def parent(x: str, config: RunnableConfig) -> str:
config["run_id"] = uuid.UUID(int=7)
return await child.ainvoke(x, config) # type: ignore

bond = uuid.UUID(int=7)
events = await _collect_events(
parent.astream_events("hello", {"run_id": bond}, version="v2"),
with_nulled_ids=False,
)
# Includes only a partial list of events since the run ID gets duplicated
# between parent and child run ID and the callback handler throws an exception.
# The exception does not get bubbled up to the user.
assert events == [
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "parent",
"parent_ids": [],
"run_id": "00000000-0000-0000-0000-000000000007",
"tags": [],
}
]


async def test_runnable_generator() -> None:
"""Test async events from sync lambda."""

Expand Down

0 comments on commit f729f65

Please sign in to comment.