Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError: Timeout context manager should be used inside a task #38

Closed
felixgao opened this issue Jun 9, 2020 · 1 comment
Closed

Comments

@felixgao
Copy link

felixgao commented Jun 9, 2020

installed pypeln==0.3.3 running on Python3.7 MacOS 10.15.5

...
  File "/Users/ggao/miniconda3/lib/python3.7/site-packages/pypeln/task/api.py", line 552, in each
    for _ in stage:
  File "/Users/ggao/miniconda3/lib/python3.7/site-packages/pypeln/task/stage.py", line 234, in to_iterable
    raise error
RuntimeError: 

Original Traceback (most recent call last):
  File "/Users/ggao/miniconda3/lib/python3.7/site-packages/pypeln/task/stage.py", line 80, in run
    **{key: value for key, value in kwargs.items() if key in self.f_args}
  File "/Users/ggao/miniconda3/lib/python3.7/site-packages/pypeln/task/stage.py", line 53, in process
    await tasks.put(task)
  File "/Users/ggao/miniconda3/lib/python3.7/site-packages/pypeln/task/utils.py", line 58, in join
    await asyncio.gather(*self.tasks)
  File "/Users/ggao/miniconda3/lib/python3.7/asyncio/tasks.py", line 442, in wait_for
    return fut.result()
  File "/Users/ggao/miniconda3/lib/python3.7/site-packages/pypeln/task/api.py", line 471, in apply
    y = await y
  File "/Users/ggao/github/intuit/finpal-qb/lambda-notifications/eventbus.py", line 105, in send
    self.URL, data=data, raise_for_status=True
  File "/Users/ggao/miniconda3/lib/python3.7/site-packages/aiohttp/client.py", line 1012, in __aenter__
    self._resp = await self._coro
  File "/Users/ggao/miniconda3/lib/python3.7/site-packages/aiohttp/client.py", line 426, in _request
    with timer:
  File "/Users/ggao/miniconda3/lib/python3.7/site-packages/aiohttp/helpers.py", line 579, in __enter__
    raise RuntimeError('Timeout context manager should be used '

I have my code as

self.session = aiohttp.ClientSession(
            headers=strategy.header, connector=aiohttp.TCPConnector(limit=None)
        )

def event_gen(self) -> Iterator[str]:
       for i, row in enumerate(self.flo):
            data = make_event(row, self.timestamp, self.uuid)
            yield data

async def send(
        self, data: str, session: aiohttp.ClientSession
    ) -> str:
        try:
            async with session.post(
                self.URL, data=data, raise_for_status=True
            ) as response:
                return await response.text()
        except aiohttp.ClientConnectionError or aiohttp.ServerConnectionError as e:
            self.logger.error(f"Unable to sent {data} due to exception: {e}")
            raise

  def run(self):
        data = event_gen()
        pl.task.each(
            send,
            data,
            workers=self.batch_size,
            on_start=lambda: dict(session=self.session),
            on_done=lambda session: session.close(),
            timeout=5,
            run=True,
        )
@cgarciae
Copy link
Owner

Hey! I've faced this in the passed, the reason is that self.session is not created in an async context. The fix looks like this:

  def run(self):
        data = event_gen()
        pl.task.each(
            send,
            data,
            workers=self.batch_size,
            on_start=lambda: dict(
                session=aiohttp.ClientSession(...)
            ),
            on_done=lambda session: session.close(),
            timeout=5,
            run=True,
        )

Now on_start which is executed within a task creates the session. Either that or make run an async function.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants