Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New executor throws RuntimeError: ... got Future <..> attached to a differen t loop #681

Closed
joy13975 opened this issue Feb 29, 2024 · 6 comments · Fixed by #1093
Closed
Labels
bug Something isn't working

Comments

@joy13975
Copy link
Contributor

joy13975 commented Feb 29, 2024

Describe the bug
Calling TestsetGenerator.generate_with_langchain_docs() throws the following error, then subsequent attempts to embed (using the same Langchain cache) would get stuck and never proceed past 0%. Suspecting cache db(sqlite3) corruption but then integration check reported no problem..

This requires deeper investigation but I'm raising it before I forget/lose the details.

Ragas version: 0.1.3
Python version: Python 3.9.13

Code to Reproduce

lc_docs = ...
testset = generator.generate_with_langchain_docs(
        documents=lc_docs,
        test_size=20,
        distributions={simple: 0.5, reasoning: 0.25, conditional: 0.25},
    )

Error trace

Exception in thread Thread-7299:                                                                          
Traceback (most recent call last):                                                                        
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/threading.py", line 980, in _bootstrap_inner  
    self.run()
  File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 93, in run                                 
    results = self.loop.run_until_complete(self._aresults())                                              
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/asyncio/base_events.py", line 647, in run_unti
l_complete                                                                                                
    return future.result()                                                                                
  File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 81, in _aresults                           
    raise e                                                                                               
  File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 76, in _aresults                           
    r = await future                                                                                      
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/asyncio/tasks.py", line 611, in _wait_for_one 
    return f.result()  # May raise f.exception().                                                         
  File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 36, in sema_coro                           
    return await coro                                                                                     
  File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 109, in wrapped_callable_async             
    return counter, await callable(*args, **kwargs)                                                       
  File "/home/ec2-user/code/ragas/src/ragas/testset/evolutions.py", line 145, in evolve                   
    ) = await self._aevolve(current_tries, current_nodes)                                                 
  File "/home/ec2-user/code/ragas/src/ragas/testset/evolutions.py", line 573, in _aevolve                 
    result = await self._acomplex_evolution(                                                              
  File "/home/ec2-user/code/ragas/src/ragas/testset/evolutions.py", line 375, in _acomplex_evolution      
    simple_question, current_nodes, _ = await self.se._aevolve(                                           
  File "/home/ec2-user/code/ragas/src/ragas/testset/evolutions.py", line 288, in _aevolve                 
    passed = await self.node_filter.filter(merged_node)                                                   
  File "/home/ec2-user/code/ragas/src/ragas/testset/filters.py", line 54, in filter                       
    results = await self.llm.generate(prompt=prompt)                                                      
  File "/home/ec2-user/code/ragas/src/ragas/llms/base.py", line 92, in generate                           
    return await agenerate_text_with_retry(                                                               
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 88, in 
async_wrapped                                                                                             
    return await fn(*args, **kwargs)                                                                      
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 47, in 
__call__                                                                                                  
    do = self.iter(retry_state=retry_state)
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 325, in
 iter                                                                                                     
    raise retry_exc.reraise()                                                                             
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 158, in
 reraise                                                                                                  
    raise self.last_attempt.result()                                                                      
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/concurrent/futures/_base.py", line 439, in res
ult                                                                                                       
    return self.__get_result()                                                                            
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/concurrent/futures/_base.py", line 391, in __g
et_result                                                                                                 
    raise self._exception                                                                                 
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 50, in 
__call__                                                                                                  
    result = await fn(*args, **kwargs)                                                                    
  File "/home/ec2-user/code/ragas/src/ragas/llms/base.py", line 176, in agenerate_text                    
    result = await self.langchain_llm.agenerate_prompt(                                                   
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_core/language_models/chat
_models.py", line 554, in agenerate_prompt                                                                
    return await self.agenerate(                                                                          
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_core/language_models/chat
_models.py", line 514, in agenerate                                                                       
    raise exceptions[0]                                                                                   
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_core/language_models/chat
_models.py", line 630, in _agenerate_with_cache                                                           
    result = await self._agenerate(                                                                       
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_google_genai/chat_models.
py", line 574, in _agenerate                                                                              
    response: genai.types.GenerateContentResponse = await _achat_with_retry(                              
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_google_genai/chat_models.
py", line 185, in _achat_with_retry                                                                       
    return await _achat_with_retry(**kwargs)                                                              
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 88, in 
async_wrapped                                                                                             
    return await fn(*args, **kwargs)
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 47, in 
__call__                                                                                                  
    do = self.iter(retry_state=retry_state)                                                               
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 314, in
 iter                                                                                                     
    return fut.result()                                                                                   
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/concurrent/futures/_base.py", line 439, in res
ult                                                                                                       
    return self.__get_result()                                                                            
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/concurrent/futures/_base.py", line 391, in __g
et_result                                                                                                 
    raise self._exception                                                                                 
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 50, in 
__call__                                                                                                  
    result = await fn(*args, **kwargs)                                                                    
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_google_genai/chat_models.
py", line 183, in _achat_with_retry                                                                       
    raise e                                                                                               
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_google_genai/chat_models.
py", line 176, in _achat_with_retry                                                                       
    return await generation_method(**kwargs)                                                              
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/generativeai/generative_mode
ls.py", line 410, in send_message_async                                                                   
    response = await self.model.generate_content_async(                                                   
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/generativeai/generative_mode
ls.py", line 275, in generate_content_async                                                               
    response = await self._async_client.generate_content(request)
    return await retry_target(                                                                            
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/api_core/retry/retry_unary_a
sync.py", line 160, in retry_target                                                                       
    _retry_error_helper(                                                                                  
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/api_core/retry/retry_base.py
", line 212, in _retry_error_helper                                                                       
    raise final_exc from source_exc                                                                       
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/api_core/retry/retry_unary_a
sync.py", line 155, in retry_target                                                                       
    return await target()
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/api_core/grpc_helpers_async.
py", line 85, in __await__                                                                                
    response = yield from self._call.__await__()                                                          
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/grpc/aio/_call.py", line 299, in __
await__                                                                                                   
    response = yield from self._call_response                                                             
RuntimeError: Task <Task pending name='Task-19654' coro=<BaseChatModel._agenerate_with_cache() running at 
/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_core/language_models/chat_models.
py:630> cb=[_gather.<locals>._done_callback() at /home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/async
io/tasks.py:767]> got Future <Task pending name='Task-19655' coro=<UnaryUnaryCall._invoke() running at /ho
me/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/grpc/aio/_call.py:568>> attached to a differen
t loop
Traceback (most recent call last):
  File "/home/ec2-user/code/parakeet/parakeet/components/ragas_gen_testset.py", line 84, in <module>
    testset = generator.generate_with_langchain_docs( 
  File "/home/ec2-user/code/ragas/src/ragas/testset/generator.py", line 223, in generate_with_langchain_do
cs                                                                                                        
    return self.generate(                            
  File "/home/ec2-user/code/ragas/src/ragas/testset/generator.py", line 316, in generate
    raise ExceptionInRunner()
ragas.exceptions.ExceptionInRunner: The runner thread which was running the jobs raised an exeception. Rea
d the traceback above to debug it. You can also pass `raise_exceptions=False` incase you want to show only
 a warning message instead.

Expected behavior
Don't throw error and don't get stuck.

Additional context
After getting this, my langchain's sqlite3 cache always gets corrupted i.e. next cached embedding access never completes. However the db integration check reports ok.

@joy13975 joy13975 added the bug Something isn't working label Feb 29, 2024
@rbellamy
Copy link

rbellamy commented Mar 1, 2024

I can consistently reproduce with a call to evaluate from a Jupyter notebook, with or without is_async=True.

Exception in thread Thread-83:
Traceback (most recent call last):
  File "/Users/rbellamy/.pyenv/versions/3.11.6/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/Users/rbellamy/test/.venv/lib/python3.11/site-packages/ragas/executor.py", line 93, in run
    results = self.loop.run_until_complete(self._aresults())
...
<SNIP/>
...
RuntimeError: Task <Task pending name='Task-178' coro=<as_completed.<locals>.sema_coro() 
  running at /Users/rbellamy/test/.venv/lib/python3.11/site-packages/ragas/executor.py:36> 
  cb=[as_completed.<locals>._on_completion() at /Users/rbellamy/.pyenv/versions/3.11.6/lib/python3.11/asyncio/tasks.py:602]> 
  got Future <Task pending name='Task-227' 
  coro=<UnaryUnaryCall._invoke() running at /Users/rbellamy/test/.venv/lib/python3.11/site-packages/grpc/aio/_call.py:568>> attached to a different loop

Calling code:

def get_eval_chain_results(question, answer, ground_truth_answer, llm, embeddings, splitter):
    # testset = get_testset_data(answer["source_documents"], llm, embeddings, splitter)
    features = Features(
        {
            "question": Value(dtype="string", id=None),
            "answer": Value(dtype="string", id=None),
            "contexts": Sequence(feature=Value(dtype="string", id=None)),
        }
    )
    mapping = {
        "question": [question],
        "answer": [answer["answer"]],
        "contexts": [[source.page_content for source in answer["source_documents"]]],
    }
    metrics = [
        answer_relevancy,
        context_relevancy,
        faithfulness,
    ]

    if ground_truth_answer is not None:
        features["ground_truths"] = Sequence(feature=Value(dtype="string", id=None))
        mapping["ground_truths"] = [[ground_truth_answer]]
        metrics.extend([answer_similarity, context_precision, context_recall])

    try:
        results = evaluate(
            dataset=Dataset.from_dict(
                mapping=mapping,
                features=features,
            ),
            metrics=metrics,
            llm=LangchainLLMWrapper(llm),
            embeddings=embeddings,
            is_async=False,
        )
    except Exception as e:
        results = {
            "answer_relevancy": float("nan"),
            "context_relevancy": float("nan"),
            "faithfulness": float("nan"),
            "answer_similarity": float("nan"),
            "context_precision": float("nan"),
            "context_recall": float("nan"),
        }
        print(f"Exception occurred while evaluating question: {question}")
        print(f"answer: {answer['answer']}")
        print(f"Error: {e}")

    return results

RC1 didn't have this issue.

@joy13975
Copy link
Contributor Author

joy13975 commented Mar 1, 2024

#689 seems to fix this issue since I can no longer reproduce the error, but I don't know from a theory & technical point of view it is really the correct fix or not. Haven't had time to verfiy exactly

@joy13975
Copy link
Contributor Author

joy13975 commented Mar 1, 2024

@rbellamy could that have anything to do with jupyter/jupyter_console#241?

@rbellamy
Copy link

rbellamy commented Mar 1, 2024

I do not. I think it's more likely something like the issue outlined in using-autoawait-in-a-notebook-ipykernel, and ipython/ipython#11338.

I've followed those directions and things are working with:

import nest_asyncio
nest_asyncio.apply()

@dosubot dosubot bot added the stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed label May 19, 2024
@rbellamy
Copy link

The nest_asyncio fix no longer works for me. The symptoms in Jupyter are that the first evaluation will complete, but all subsequent calls to evaluate will fail with some kind of deadlock or race condition in the event loop.

I've tried a bunch of different methods of initiating the loop in my copy of executor.py without appreciable effect. I wish I had more time to look into this - I really want to be able to use this tool during evaluations. It's hard when I can't reliably run it in Jupyter.

@dosubot dosubot bot removed the stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed label May 27, 2024
jjmachan pushed a commit that referenced this issue Jul 2, 2024
…loop (#979)

In issue #963 I commented about the RuntimeError when trying to use
TestsetGenerator.

In issue #681 you discussed that using get_event_loop() was the optimal
way to solve the issue but it had a Deprecation warning.

<img width="705" alt="image"
src="https://github.com/explodinggradients/ragas/assets/76526314/94e5cd25-7e4f-4989-8b72-93cb74274a8f">

In newer python versions get_event_loop() raises a RuntimeError when no
event loop is active. Thus we can avoid creating unnecessary new loops
that seem to be the problem that was mentioned in both issues.


I have modified the code the least possible to make it clear what I
intended to, although in this setup runner could be avoided as was
proposed in #689

Setup: 
Ragas version: 0.1.7
Python version: Python 3.10.13
@jjmachan
Copy link
Member

hey @joy13975 I took inspiration from your code and removed the runner as you suggested in the above PR. closing this one for now. Thanks a lot for your input ❤️ .

if you get some time to look at that PR I would really appreciate it too!

jjmachan added a commit that referenced this issue Jul 12, 2024
ragas `0.1.10` broke how the executor was functioning and there were a
few hard to reproduce bugs overall. This fixes all those.

- uses `nest_asyncio` in jupyter notebooks 
- added tests in jupyter notebook with `nbmake`
- removed the Runner. this means all the `Tasks` will be run in the same
event loop as main. for jupyter notebooks we will use `nest_async` lib

takes inspiration from #689 
fixes: #681
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants