Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent_executor.astream_events does not work with ChatGroq #24699

Open
5 tasks done
PruthvirajChavan98 opened this issue Jul 26, 2024 · 1 comment
Open
5 tasks done

agent_executor.astream_events does not work with ChatGroq #24699

PruthvirajChavan98 opened this issue Jul 26, 2024 · 1 comment
Labels
🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature investigate

Comments

@PruthvirajChavan98
Copy link

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.
  • The bug is not resolved by updating to the latest stable version of LangChain (or the specific integration package).

Example Code

from langchain_groq import ChatGroq
from langchain_community.tools.ddg_search import DuckDuckGoSearchRun
from langchain.prompts import ChatPromptTemplate
from langchain.agents import create_tool_calling_agent
from langchain.agents import AgentExecutor

llm = ChatGroq(temperature=0, model_name="llama-3.1-70b-versatile", api_key="", streaming=True)
ddg_search = DuckDuckGoSearchRun()
prompt = ChatPromptTemplate.from_messages([("system","You are a helpful Search Assistant"),
("human","{input}"),
("placeholder","{agent_scratchpad}")])

tools = [ddg_search]
search_agent = create_tool_calling_agent(llm,tools,prompt)

search_agent_executor = AgentExecutor(agent=search_agent, tools=tools, verbose=False, handle_parsing_errors=True)

async for event in search_agent_executor.astream_events(
{"input": "who is narendra modi"}, version="v1"
):
kind = event["event"]

if kind == "on_chat_model_stream":
    content = event["data"]["chunk"].content
    if content: 
        print(content, end="", flush=True)

Error Message and Stack Trace (if applicable)


AttributeError Traceback (most recent call last)
Cell In[37], line 1
----> 1 async for event in search_agent_executor.astream_events(
2 {"input": "who is narendra modi"}, version="v1"
3 ):
4 kind = event["event"]
6 if kind == "on_chat_model_stream":

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\runnables\base.py:1246, in Runnable.astream_events(self, input, config, version, include_names, include_types, include_tags, exclude_names, exclude_types, exclude_tags, **kwargs)
1241 raise NotImplementedError(
1242 'Only versions "v1" and "v2" of the schema is currently supported.'
1243 )
1245 async with aclosing(event_stream):
-> 1246 async for event in event_stream:
1247 yield event

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\tracers\event_stream.py:778, in _astream_events_implementation_v1(runnable, input, config, include_names, include_types, include_tags, exclude_names, exclude_types, exclude_tags, **kwargs)
774 root_name = config.get("run_name", runnable.get_name())
776 # Ignoring mypy complaint about too many different union combinations
777 # This arises because many of the argument types are unions
--> 778 async for log in _astream_log_implementation( # type: ignore[misc]
779 runnable,
780 input,
781 config=config,
782 stream=stream,
783 diff=True,
784 with_streamed_output_list=True,
785 **kwargs,
786 ):
787 run_log = run_log + log
789 if not encountered_start_event:
790 # Yield the start event for the root runnable.

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\tracers\log_stream.py:670, in _astream_log_implementation(runnable, input, config, stream, diff, with_streamed_output_list, **kwargs)
667 finally:
668 # Wait for the runnable to finish, if not cancelled (eg. by break)
669 try:
--> 670 await task
671 except asyncio.CancelledError:
672 pass

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\tracers\log_stream.py:624, in _astream_log_implementation..consume_astream()
621 prev_final_output: Optional[Output] = None
622 final_output: Optional[Output] = None
--> 624 async for chunk in runnable.astream(input, config, **kwargs):
625 prev_final_output = final_output
626 if final_output is None:

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain\agents\agent.py:1793, in AgentExecutor.astream(self, input, config, **kwargs)
1781 config = ensure_config(config)
1782 iterator = AgentExecutorIterator(
1783 self,
1784 input,
(...)
1791 **kwargs,
1792 )
-> 1793 async for step in iterator:
1794 yield step

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain\agents\agent_iterator.py:266, in AgentExecutorIterator.aiter(self)
260 while self.agent_executor._should_continue(
261 self.iterations, self.time_elapsed
262 ):
263 # take the next step: this plans next action, executes it,
264 # yielding action and observation as they are generated
265 next_step_seq: NextStepOutput = []
--> 266 async for chunk in self.agent_executor._aiter_next_step(
267 self.name_to_tool_map,
268 self.color_mapping,
269 self.inputs,
270 self.intermediate_steps,
271 run_manager,
272 ):
273 next_step_seq.append(chunk)
274 # if we're yielding actions, yield them as they come
275 # do not yield AgentFinish, which will be handled below

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain\agents\agent.py:1483, in AgentExecutor._aiter_next_step(self, name_to_tool_map, color_mapping, inputs, intermediate_steps, run_manager)
1480 intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
1482 # Call the LLM to see what to do.
-> 1483 output = await self.agent.aplan(
1484 intermediate_steps,
1485 callbacks=run_manager.get_child() if run_manager else None,
1486 **inputs,
1487 )
1488 except OutputParserException as e:
1489 if isinstance(self.handle_parsing_errors, bool):

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain\agents\agent.py:619, in RunnableMultiActionAgent.aplan(self, intermediate_steps, callbacks, **kwargs)
611 final_output: Any = None
612 if self.stream_runnable:
613 # Use streaming to make sure that the underlying LLM is invoked in a
614 # streaming
(...)
617 # Because the response from the plan is not a generator, we need to
618 # accumulate the output into final output and return that.
--> 619 async for chunk in self.runnable.astream(
620 inputs, config={"callbacks": callbacks}
621 ):
622 if final_output is None:
623 final_output = chunk

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\runnables\base.py:3278, in RunnableSequence.astream(self, input, config, **kwargs)
3275 async def input_aiter() -> AsyncIterator[Input]:
3276 yield input
-> 3278 async for chunk in self.atransform(input_aiter(), config, **kwargs):
3279 yield chunk

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\runnables\base.py:3261, in RunnableSequence.atransform(self, input, config, **kwargs)
3255 async def atransform(
3256 self,
3257 input: AsyncIterator[Input],
3258 config: Optional[RunnableConfig] = None,
3259 **kwargs: Optional[Any],
3260 ) -> AsyncIterator[Output]:
-> 3261 async for chunk in self._atransform_stream_with_config(
3262 input,
3263 self._atransform,
3264 patch_config(config, run_name=(config or {}).get("run_name") or self.name),
3265 **kwargs,
3266 ):
3267 yield chunk

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\runnables\base.py:2160, in Runnable._atransform_stream_with_config(self, input, transformer, config, run_type, **kwargs)
2158 while True:
2159 if accepts_context(asyncio.create_task):
-> 2160 chunk: Output = await asyncio.create_task( # type: ignore[call-arg]
2161 py_anext(iterator), # type: ignore[arg-type]
2162 context=context,
2163 )
2164 else:
2165 chunk = cast(Output, await py_anext(iterator))

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\tracers\log_stream.py:258, in LogStreamCallbackHandler.tap_output_aiter(self, run_id, output)
246 async def tap_output_aiter(
247 self, run_id: UUID, output: AsyncIterator[T]
248 ) -> AsyncIterator[T]:
249 """Tap an output async iterator to stream its values to the log.
250
251 Args:
(...)
256 T: The output value.
257 """
--> 258 async for chunk in output:
259 # root run is handled in .astream_log()
260 if run_id != self.root_id:
261 # if we can't find the run silently ignore
262 # eg. because this run wasn't included in the log
263 if key := self._key_map_by_run_id.get(run_id):

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\runnables\base.py:3231, in RunnableSequence._atransform(self, input, run_manager, config, **kwargs)
3229 else:
3230 final_pipeline = step.atransform(final_pipeline, config)
-> 3231 async for output in final_pipeline:
3232 yield output

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\runnables\base.py:1313, in Runnable.atransform(self, input, config, **kwargs)
1310 final: Input
1311 got_first_val = False
-> 1313 async for ichunk in input:
1314 # The default implementation of transform is to buffer input and
1315 # then call stream.
1316 # It'll attempt to gather all input into a single chunk using
1317 # the + operator.
1318 # If the input is not addable, then we'll assume that we can
1319 # only operate on the last chunk,
1320 # and we'll iterate until we get to the last chunk.
1321 if not got_first_val:
1322 final = ichunk

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\runnables\base.py:5276, in RunnableBindingBase.atransform(self, input, config, **kwargs)
5270 async def atransform(
5271 self,
5272 input: AsyncIterator[Input],
5273 config: Optional[RunnableConfig] = None,
5274 **kwargs: Any,
5275 ) -> AsyncIterator[Output]:
-> 5276 async for item in self.bound.atransform(
5277 input,
5278 self._merge_configs(config),
5279 **{**self.kwargs, **kwargs},
5280 ):
5281 yield item

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\runnables\base.py:1331, in Runnable.atransform(self, input, config, **kwargs)
1328 final = ichunk
1330 if got_first_val:
-> 1331 async for output in self.astream(final, config, **kwargs):
1332 yield output

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\language_models\chat_models.py:439, in BaseChatModel.astream(self, input, config, stop, **kwargs)
434 except BaseException as e:
435 await run_manager.on_llm_error(
436 e,
437 response=LLMResult(generations=[[generation]] if generation else []),
438 )
--> 439 raise e
440 else:
441 await run_manager.on_llm_end(
442 LLMResult(generations=[[generation]]),
443 )

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_core\language_models\chat_models.py:417, in BaseChatModel.astream(self, input, config, stop, **kwargs)
415 generation: Optional[ChatGenerationChunk] = None
416 try:
--> 417 async for chunk in self._astream(
418 messages,
419 stop=stop,
420 **kwargs,
421 ):
422 if chunk.message.id is None:
423 chunk.message.id = f"run-{run_manager.run_id}"

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_groq\chat_models.py:582, in ChatGroq._astream(self, messages, stop, run_manager, **kwargs)
578 if "tools" in kwargs:
579 response = await self.async_client.create(
580 messages=message_dicts, **{**params, **kwargs}
581 )
--> 582 chat_result = self._create_chat_result(response)
583 generation = chat_result.generations[0]
584 message = cast(AIMessage, generation.message)

File d:\Learning\Groq-Tool-Calling.venv\Lib\site-packages\langchain_groq\chat_models.py:665, in ChatGroq._create_chat_result(self, response)
663 generations = []
664 if not isinstance(response, dict):
--> 665 response = response.dict()
666 token_usage = response.get("usage", {})
667 for res in response["choices"]:

AttributeError: 'AsyncStream' object has no attribute 'dict'

Description

langchain Version: 0.2.11

System Info

System Information

OS: Windows
OS Version: 10.0.19045
Python Version: 3.11.4 (tags/v3.11.4:d2340ef, Jun 7 2023, 05:45:37) [MSC v.1934 64 bit (AMD64)]

Package Information

langchain_core: 0.2.23
langchain: 0.2.11
langchain_community: 0.2.10
langsmith: 0.1.93
langchain_cohere: 0.1.9
langchain_experimental: 0.0.63
langchain_groq: 0.1.6
langchain_openai: 0.1.17
langchain_text_splitters: 0.2.2

Packages not installed (Not Necessarily a Problem)

The following packages were not found:

langgraph
langserve

@langcarl langcarl bot added the investigate label Jul 26, 2024
@dosubot dosubot bot added Ɑ: agent Related to agents module 🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature labels Jul 26, 2024
@eyurtsev eyurtsev removed the Ɑ: agent Related to agents module label Jul 26, 2024
@ccurme
Copy link
Collaborator

ccurme commented Jul 26, 2024

Hello, thank you for reporting this.

I was able to reproduce the issue. I believe it is resolved by omitting streaming=True when initializing the chat model.

This remains a bug but hopefully that will fix your use-case. Let me know if you continue to experience issues.

Minimal example:

from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
from langchain_groq import ChatGroq

@tool
def magic_function(input: int) -> int:
    """Applies a magic function to an input."""
    return input + 2

tools = [magic_function]
llm = ChatGroq(model="llama-3.1-70b-versatile").bind_tools(tools)

prompt = ChatPromptTemplate.from_messages(
    [
        ("system","You are a helpful Search Assistant"),
        ("human","{input}"),
        ("placeholder","{agent_scratchpad}"),
    ]
)

agent = create_tool_calling_agent(llm, tools, prompt)

agent_executor = AgentExecutor(
    agent=agent, tools=tools, verbose=False,
)

events = []
async for event in agent_executor.astream_events(
    {"input": "what is the value of magic_function(3)?"},
    version="v1",
):
    events.append(event)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature investigate
Projects
None yet
Development

No branches or pull requests

3 participants