Skip to content
This repository has been archived by the owner on Dec 15, 2020. It is now read-only.

Paginator __aexit__ coroutine not correctly setting finish event #39

Open
mklokocka opened this issue Jul 23, 2018 · 19 comments
Open

Paginator __aexit__ coroutine not correctly setting finish event #39

mklokocka opened this issue Jul 23, 2018 · 19 comments

Comments

@mklokocka
Copy link

mklokocka commented Jul 23, 2018

Hello,

there is a problem with this line:

self._exit_event.set()

The __aexit__ coroutine is not correctly setting the self._finish_event. There are problems with the way aiocassandra uses threading, to correct this, we have to call self._loop.call_soon_threadsafe(self._finish_event.set).

This leads to problems when a task working through results of the pagination is suddenly cancelled.

Would it be possible to fix this, or should I make a custom fork for my use?

@mklokocka
Copy link
Author

I have added a proposed pull request to fix this issue:

#40

@hellysmile Would you check this?

@hellysmile
Copy link
Member

Hey, how You can call __aexit__ from another thread?

@hellysmile
Copy link
Member

Maybe we should convert all events to threading.Events

@hellysmile
Copy link
Member

Can You reproduce error with example?

@mklokocka
Copy link
Author

I am not sure to be honest. Asyncio is hard for me as it is, combining it with Threading is an overkill for my understanding.

I am trying to get Cassandra to work locally to test if I can find a suitable example. The error happened to me when I used Paginator in an aiohttp request handler, and I cancelled the request.

@mklokocka
Copy link
Author

I have been trying to put together an example, but so far, I cannot simulate the problem. I think it has to deal with the fact that I am working with several hosts and tens of executor threads in Cassandra.

I have two CQLs I am processing asynchronously - one process some data into one asyncio queue, the another one process the results from the queue into another queue that is then processed by a third (non Cassandra) consumer. Maybe the problem is that those queues have fixed sizes and once they are full, the task running the Paginator must wait until there is some free space.

@mklokocka
Copy link
Author

A small example is here: https://gist.github.com/mklokocka/1c459b2b3eadaa9bc4b229b4bc4470f5

If you cancel the request fast enough, you can sometimes find

ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending coro=<Event.wait() running at /usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/locks.py:293> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10f537a38>()]>>

When you are dealing with ~20m rows, this happens so often it sometimes make the whole app hang up and the memory fly to hell too. :(

@hellysmile
Copy link
Member

If You do close http connection to aiohttp, it automatically sends cancel into Your request handler, can You try the same without aiohttp? It is common issue for aiohttp

@mklokocka
Copy link
Author

Well that is the normal behaviour, the task retrieving data from Cassandra should and have to be cancelled. I do not need it bogging the rest of the application. I cancel those tasks myself anyway, in the finally clause you can see in snippet.

@hellysmile
Copy link
Member

I mean maybe it issue by aio-libs/aiohttp#2098 aio-libs/aiohttp#2374

@mklokocka
Copy link
Author

The thing is I am not writing anything to the DB, just retrieving data by using a paginator. This should not lead to the behaviour experienced - the tasks processing the data from DB should be cancelled so the app does not go through all the ~20m rows of data without anything acting on them.

@hellysmile
Copy link
Member

hellysmile commented Jul 26, 2018

I am really sorry, but will try to exmplain one more time...

It is do not matter do You write or read, is it HEAD/GET/POST request to aiohttp

If You doing something inside aiohttp request, and client disconnected You will get asyncio.CancelledError

Can You please try to do same snippet without aiohttp? Cuz right now we are not sure does aiohttp produces this issue or aiocassandra itself

@mklokocka
Copy link
Author

What I am trying to explain is it should not matter if it is aiohttp or not. It is not aiohttp problem that I have a coroutine running the aiocassandra paginator that I, myself, cancel - that passes an asyncio.CancelledError into the coroutine. It is normal, expected behaviour.

The problem is Paginator cannot handle this cancellation. I do not know the precise reason why, but I think it is because of wrong handling of the exit_event set, because changing it as I proposed fix the problem fully on my side. It is not just the error messages about Event.waits, but the fact that for some reason the Paginator keeps hoarding data and working, and I need to have them cancelled.

I have tried to find an example without aiohttp, but so far I have not succeeded to emulate the same behaviour - I think it is because aiohttp handles requests as they come, starting such coroutines (as those handling Cassandra in my case) on the fly. But honestly, I do not have the time to spend days on this. I have not written this library and I do not claim to be a pro on asyncio and threading. I would be grateful if you looked at it, otherwise I am just going with my fork using the fix I proposed you somehow keep ignoring.

@mklokocka
Copy link
Author

Ah, I see the problem now. There are three events at the same time, self._drain_event, self._finish_event and self._exit_event. Now, in the case the Paginator exits prematurely, __aexit__ is called, it sets the self._exit_event, but you do not set the self._finish_event anywhere, so this clause

await asyncio.wait(
                (
                    self._drain_event.wait(),
                    self._finish_event.wait(),
                ),
                return_when=asyncio.FIRST_COMPLETED,
                loop=self._loop
)

Never actually finishes. Therefore I think there must be a call to self._finish_event.set() somewhere either in __aexit__ or in _handle_page. How does that sound?

@hellysmile
Copy link
Member

This sounds more clear, Ill try to find a fix/reproduce it over this weekend!

@mklokocka
Copy link
Author

Sure! I am also having some memory issues I am not sure are tied to this or not, so I will see about that (could be that the unfinished await in paginator keeps data in memory? I don't know).

@mklokocka
Copy link
Author

Any news @hellysmile ?

@mklokocka
Copy link
Author

Ping @hellysmile

@mklokocka mklokocka reopened this Aug 7, 2018
@FedirAlifirenko
Copy link
Contributor

Hi @hellysmile I found the same behavior, could you take a look?

Steps to reproduce:

  1. start cassandra docker-compose up -d cassandra
cassandra:
    image: cassandra:3.11.0
    container_name: cassandra
    ports:
      - 9042:9042
  1. wait some time until system.size_estimates will be populated with data
cqlsh> SELECT count(*) from system.size_estimates ;

 count
-------
  2313
  1. run script below
import asyncio

from aiocassandra import aiosession
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement


async def main():
    cluster = Cluster(
        contact_points=['127.0.0.1'],
        port=9042,
    )
    session = cluster.connect("system")
    aiosession(session)

    stmt = SimpleStatement("SELECT * from system.size_estimates ;", fetch_size=100)

    try:
        for _ in range(3):
            async with session.execute_futures(stmt) as paginator:
                n = 0
                async for row in paginator:
                    if n == 200:
                        break
                    n += 1

    finally:
        cluster.shutdown()


if __name__ == '__main__':
    import logging

    logging.basicConfig(
        level=logging.ERROR,
        format="%(asctime)s - [%(levelname)s] - [%(name)s] - %(filename)s:%(lineno)d - %(message)s",
    )

    asyncio.run(main())

Received output:

2019-06-18 17:51:11,756 - [ERROR] - [asyncio] - base_events.py:1608 - Task was destroyed but it is pending!
task: <Task pending coro=<Event.wait() running at /Users/fedir/.pyenv/versions/3.7.3/lib/python3.7/asyncio/locks.py:293> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10d1abbb8>()]>>
2019-06-18 17:51:11,756 - [ERROR] - [asyncio] - base_events.py:1608 - Task was destroyed but it is pending!
task: <Task pending coro=<Event.wait() running at /Users/fedir/.pyenv/versions/3.7.3/lib/python3.7/asyncio/locks.py:293> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10d1c2c78>()]>>
2019-06-18 17:51:11,756 - [ERROR] - [asyncio] - base_events.py:1608 - Task was destroyed but it is pending!
task: <Task pending coro=<Event.wait() running at /Users/fedir/.pyenv/versions/3.7.3/lib/python3.7/asyncio/locks.py:293> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10d1ddd38>()]>>
...

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants