Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make engine.acquire() reentrant #128

Closed
mpaolini opened this issue Jul 18, 2016 · 50 comments
Closed

Make engine.acquire() reentrant #128

mpaolini opened this issue Jul 18, 2016 · 50 comments

Comments

@mpaolini
Copy link
Contributor

currently acquiring a connection from the pool in a nested manner is deadlock-prone. See #113 #126 and #127

Sometimes (e.g when implementing middlewares or forms validators, or any other feature plugged in existing frameworks) is just difficult to pass the connection instance down the call stack.

I propose a new feature where engine.acquire() returns the same already acquired connection if called again inside a engine.acquire() context by the same asyncio task.

this would mean this test would pass

async def test_nested_acquire(event_loop):
    async with engine.acquire() as conn1:
        async with engine.acquire() as conn2:
            assert conn1 == conn2
@serg666
Copy link

serg666 commented Jul 18, 2016

@asvetlov What do you think about this new feature?

@jettify
Copy link
Member

jettify commented Jul 18, 2016

how would you implement this feature without introducing some sort of global registry bounded to current task?

@popravich
Copy link
Member

Hi, this idea can be more harmful then helpful,
consider following use case:

async def in_real(event_loop):
    async with engine.acquire() as conn1:
        async with conn1.begin() as tr:
            await conn.execute("INSERT....")
            await test_nested_acquire(conn1)

async def test_nested_acquire(outer_conn):
    async with engine.acquire() as conn2:
        assert outer_conn is conn2
        await conn2.execute("SELECT 1/0;")

Outer transaction will be corrupted.

Implementing this (re-entrant acquire) will require writing complex code checking many possible states of connections / task / etc and it will be hard to test (to instrument all possible usages and task states)

@mpaolini
Copy link
Contributor Author

@popravich the transaction will be errored and no further sql statements will be allowed before the begin block exits. I think this is just the way it should be. Maybe it is better if we add a engine.acquire(reentrant=True) to make this option an opt-in.

@mpaolini
Copy link
Contributor Author

@jettify I would keep this registry somewhere near the Pool._used set just as we have today. So basically one can have a gobal engine instance and call engine.acquire(reentrant=True)

@popravich
Copy link
Member

What I'm trying to say is that its an anti-pattern -- when you're acquiring connection you expect
it is not used by some other code and you can even call conn.execute('rollback') before any operation. So such a use case will bring even more inconsistency into a programm.

@mpaolini
Copy link
Contributor Author

@popravich the issue is: accessing the current connection for the current asyncio task inside a function without passing it as an argument

I think it is quite a common problem. See for instance the aiohttp chat demo views here each db.py function acquires a connection from the db.

If in the future you need to add another query, somewhere in your views, you have the following options:

  1. you keep acquiring and releasing db connections multiple times in the request handling (suboptimal performance-wise) (frameworks like django usually hide the db connections and the connection pool)
  2. you acquire a connection in the view and pass it down to your db functions.

If you go for 2. you must be very very brave: If some of the code you call inside your acquire bock does another acquire, than you have a lurking deadlock, that might block the entire application forever until kill.

So, either we detect reentrant acquires() and we raise an exception right away, or we handle them somehow.

@serg666
Copy link

serg666 commented Jul 18, 2016

In any way, if I have acquired the connection from the pool for serving the request, I have to be sure that I am using the same connection for all calls to the database in the context of the processing with the current request. Although it may be implemented as said @mpaolini by storing the current connection in a task-local variable, but it seems to me it is a quite common problem.

@mpaolini
Copy link
Contributor Author

I am going to check what django and pymongo have in place. Will be back to you soon

@serg666
Copy link

serg666 commented Jul 18, 2016

@mpaolini wrote

"If you go for 2. you must be very very brave: If some of the code you call inside your acquire bock does another acquire, than you have a lurking deadlock, that might block the entire application forever until kill. "

It is a very, very right note.

We can very easily block the entire application forever until kill, and we have got something like I show in #126, then the entire application has been blocked forever

@mpaolini
Copy link
Contributor Author

ok sharing the first finding for pymongo

Tthe low-level Mongoengine._get_socket is not reentrant and is deadlock-prone just like aiopg. Note that this is a private API and the context manager is always consumed completely inside public API methods, so there is no potential deadlock in the public API.

In other words, there is no public way of acquiring a "socket" from the and use it. Every time you call a client.db.something() a socket is acquired and released to the pool right away.

If we were to copy from them, aiopg should not expose its engine.acquire() as a public api.

the test script follows

import time
import threading
import logging

from pymongo import MongoClient

SLEEP_TIME = 0.1

logger = logging.getLogger(__name__)


def do_query(client, sleep):
    client.test.stuff.find_one({'test': 1})
    time.sleep(sleep)
    do_query_inner(client, sleep)


def do_query_inner(client, sleep):
    client.test.stuff_2.find_one({'test': 1})
    time.sleep(sleep)


def get_socket_nested(client, sleep):
    with client._socket_for_reads(None):
        logger.debug('acquired outer for task {}'.format(threading.current_thread().name))
        time.sleep(sleep)
        with client._socket_for_reads(None):
            logger.debug('acquired inner for task {}'.format(threading.current_thread().name))
            time.sleep(sleep)


def main(concurrency, iterations, pool_size, sleep, nested):
    client = MongoClient(maxPoolSize=pool_size)
    for i in range(iterations):
        threads = []
        for it in range(concurrency):
            thread = threading.Thread(
                target=get_socket_nested if nested else do_query,
                args=[client, sleep],
                name='{}-{}'.format(i, it),
                daemon=True
            )
            thread.start()
            threads.append(thread)
        for thread in threads:
            thread.join()


if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--concurrency', type=int, default=1)
    parser.add_argument('-i', '--iterations', type=int, default=10)
    parser.add_argument('-n', '--nested', action='store_true')
    parser.add_argument('-p', '--pool-size', type=int, default=10)
    parser.add_argument('-s', '--sleep', type=float, default=0.1)
    args = parser.parse_args()
    logging.basicConfig(level=logging.DEBUG, format='%(funcName)s %(lineno)d: %(message)s')
    main(args.concurrency, args.iterations, args.pool_size, args.sleep, args.nested)

@popravich
Copy link
Member

you keep acquiring and releasing db connections multiple times in the request handling (suboptimal performance-wise) (frameworks like django usually hide the db connections and the connection pool)

This is only possible if you're using global variables (engine) otherwise you need to pass either engine (obtained from request object), or request object into your functions from view handler.
If we're talking about global vars -- that's the point where I quit -- I believe global vars must be hidden deep inside library and not exposed to user.

@serg666
Copy link

serg666 commented Jul 18, 2016

If we were to copy from them, aiopg should not expose its engine.acquire() as a public api.

But it does.

@mpaolini
Copy link
Contributor Author

django situation:

django uses a global var where it keeps all open connections as thread locals. Cursors are reentrant (I think) and all share the same connection. Connection pool implementation is very basic (supports basically a single connection per thread) and is completely hidden from the public API.

As in pymongo there is no potential for pool deadlock using the public API.

@mpaolini
Copy link
Contributor Author

@popravich I never suggested we mantain a global var in aiopg. My proposal is letting the user keep a global var engine instance and then letting the user use a safe API for accessing the library. This API should not be deadlock prone (as it currently is)

@popravich
Copy link
Member

Yep, that is the source of the problem -- "letting the user keep a global var engine" )

@jettify
Copy link
Member

jettify commented Jul 18, 2016

Agree with @popravich. From other side aiopg is used also by tornado
framework, so binding connection to asyncio task will not always work.

On Mon, Jul 18, 2016, 20:56 serg666 notifications@github.com wrote:

If we were to copy from them, aiopg should not expose its engine.acquire()
as a public api.

But it does.


You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub
#128 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AANoZyDpLfV3xG1eCoJvyAHTb6G7dNJQks5qW75jgaJpZM4JOv7L
.

@mpaolini
Copy link
Contributor Author

Ok, so you suggest acquiring the connection once and pass it all the way down the call stack, making sure no code down the code stack ever acquires a new connection from the same pool.

But that brings us to the very subject of this issue: in case your code is deep within another framework like in this example, and you can't control the arguments to a function, you (as aiopg) are forcing the user to implement a global variable holding the connection, right?

I'm fine with the solution, (a won'tfix) but then let's at least issue a warning when a user is acquiring a connection in a nested manner, or at least we could write it in the docs.

@serg666
Copy link

serg666 commented Jul 18, 2016

From other side aiopg is used also by tornado framework, so binding connection to asyncio task will not always work.

So, should I use approach described by @mpaolini ?

CONNECTIONS = {}

def get_connection(looop):
    global CONNECTIONS
    return CONNECTIONS[asyncio.Task.current_task(loop)]

but in this case we can always got deadlock if inner code make engine.acquire() call

@popravich
Copy link
Member

Ok, so you suggest acquiring the connection once and pass it all the way down the call stack, making sure no code down the code stack ever acquires a new connection from the same pool.

Simple answer is yes,

But that brings us to the very subject of this issue: in case your code is deep within another framework like in this example, and you can't control the arguments to a function...

In case of tornado RequestHandler provides initialize() hook so engine can be attached to request handler, example from tornado docs:

class ProfileHandler(RequestHandler):
    def initialize(self, database):
        self.database = database

    def get(self, username):
        ...

app = Application([
    (r'/user/(.*)', ProfileHandler, dict(database=database)),
    ])

replace dict(database=database) with dict(database=aiopg_engine) and it should work

@jettify
Copy link
Member

jettify commented Jul 18, 2016

@serg666

CONNECTIONS[asyncio.Task.current_task(loop)]

this line do not work, since tornado do not know anything about asyncio.Task

@serg666
Copy link

serg666 commented Jul 18, 2016

replace dict(database=database) with dict(database=aiopg_engine) and it should work

And how I should access connection in framework like FormEncode in this example #127 ?

@mpaolini
Copy link
Contributor Author

this line do not work, since tornado do not know anything about asyncio.Task

@jettify definitely, this was just an example, and by no means a full implemententation

@mpaolini
Copy link
Contributor Author

replace dict(database=database) with dict(database=aiopg_engine) and it should work

it works as long as you are able to pass down the connection object

@serg666
Copy link

serg666 commented Jul 18, 2016

So, the only way is acquire new connection inside FormEncode validator witch leads to deadlock

@serg666
Copy link

serg666 commented Jul 18, 2016

Or I can set pool size as long as possible :-)

@mpaolini
Copy link
Contributor Author

My reasoning goes like this:

  1. if you acquire a connection in a nested manner, you can deadlock very badly
  2. there are situations where you can't control the args to your function and thus can't pass the connection down the call stack, in this cases the user is keen to just acquire a new connection from the same engine, and this is very very dangerous

If we all agree on 1. and 2. let's move forward to the...

Proposed solutions:

  1. we make .acquire() reentrant

  2. we try to hide the .acquire() from the public API

  3. we issue warnings when we see .acquire() used in a reentrant manner

  4. we write in the docs that nested .acquire() lead to deadlock, and you better

    a. create the connection once and pass it down the call stack
    b. use a global registry of connections

@popravich
Copy link
Member

replace dict(database=database) with dict(database=aiopg_engine) and it should work

And how I should access connection in framework like FormEncode in this example #127 ?

You'll need a bit of work:

  • create custom Schema -- add engine in init
  • override add_field passing engine to fields of your custom types -- field.engine = self.engine
  • in this fields use self.engine

@mpaolini
Copy link
Contributor Author

in this fields use self.engine

... and still you will be suffering deadlocks when doing self.engine.acquire()

@popravich
Copy link
Member

ok, not engine but connection

@serg666
Copy link

serg666 commented Jul 18, 2016

But then some inner code (not my code) call acquire it lead to deadlock in any way)

@popravich
Copy link
Member

Proposed solutions:

  1. we make .acquire() reentrant

  2. we try to hide the .acquire() from the public API

  3. we issue warnings when we see .acquire() used in a reentrant manner

  4. we write in the docs that nested .acquire() lead to deadlock, and you better

    a. create the connection once and pass it down the call stack
    b. use a global registry of connections

I'm +1 for 4a and 3 and -1 for 2

I'm not against 1 but implementation can be very complex and it will need a lot of tests

@mpaolini
Copy link
Contributor Author

I have always seen the engine passed down the call stack (at least in aiohttp demos). I think this is because some of the constructors are singletons (the App, the SiteHandler) and are only ever instanciated once in the app lifetime.

If we now need to pass the connection itself, we cannot have singletons anymore. Are you sure @popravich this is the right way to go?

@serg666
Copy link

serg666 commented Jul 18, 2016

ok, not engine but connection

This is very hard to pass connection to all places I need. In my example I need connection inside

class converter(int):

@jettify
Copy link
Member

jettify commented Jul 18, 2016

@mpaolini you can use middleware for that:

async def middleware_factory(app, handler):

    async def middleware_handler(request):
        engine = app['engine']
        async with engine.acquire() as conn1:
             request['connection'] = conn1
             resp = await handler(request)
        return  resp
    return middleware_handler

async def my_handler(request):
    conn = request['connection']
    return web.Response(body=b'text')

@serg666
Copy link

serg666 commented Jul 18, 2016

There are no request object in FormEncode validator

@popravich
Copy link
Member

Why not changing converter?

class TransactionID(FancyValidator):
    class converter(int):
        @property
        async def transaction(self, conn):
            res = await conn.execute(m.transactions.select().
                                     where(m.transactions.c.id == self))
            trans = await res.fetchone()
            if trans is None:
                raise exceptions.InvalidTransaction(self)
            return trans


class ReqHandler(tornado.web.RequestHandler):
    def initialize(self, engine):
        self.engine = engine

    async def post(self):
        async with self.engine.acquire() as conn:
            transaction_id = self.BODY.get('transaction')
            transaction = await transaction_id.transaction(conn)

@serg666
Copy link

serg666 commented Jul 18, 2016

Guys, it seems to me you are trying to solve my custom problem, but I think this is a quite common problem, as @mpaolini said

@asvetlov
Copy link
Member

@mpaolini @serg666 sorry guys, I support @popravich and @jettify
I'll try to make an explanation for my opinion later, maybe tomorrow.
Thanks to EuroPython I have very limited free time these days.

@mpaolini
Copy link
Contributor Author

@mpaolini you can use middleware for that:

@jettify again, you are proposing to pass connection down the call stack. I know it is right, but it is worthless in cases where you can't control the args of the function.

@asvetlov
Copy link
Member

asvetlov commented Jul 20, 2016

Proposed solutions:

  1. we make .acquire() reentrant

Not an option. Connection is a state-full object. You cannot open transaction in one task, change the state and re-enter into clean state for the same connection from other task.

It works for stateless connections like mongodb and redis without MULTI statements but for RDBMs servers have a very rich state.

  1. we try to hide the .acquire() from the public API

Good option but it should be solved by high-level libraries like ORM built on top of aiopg (not aiopg.sa).
It's out of scope of aiopg library itself.

  1. we issue warnings when we see .acquire() used in a reentrant manner

You don't know the context. The code may rely on task but calling asyncio.gather breaks the rule:

def f(pool):
    async with pool.acquire() as conn:
        conn.execute(...)

await asyncio.gather(f(pool), f(pool)) 

is totaly correct. There is no way to check is aquiring leads to deadlock or not.

  1. we write in the docs that nested .acquire() lead to deadlock, and you better

This is the best what we can do. Any volunteer?

4.a) create the connection once and pass it down the call stack

I support this way. If some library doesn't support passing connection explicitly the library cannot be used with aiopg, sorry. But I believe for any concrete case there is a solution.

4.b) use a global registry of connections

No, not an option. Let's never use global objects at all. It's the easiest way to make a mess and break the Universe.

@mpaolini
Copy link
Contributor Author

OK let's go for 4. then, document the proper use of the aiopg connection pool.

From the aiohttp demos we see there is a singleton Engine instance instanciated once and passed around to the database functions, that use acquire the connection from it when needed

This demo app does not trigger the deadlock, so copying from it we could say:

  • keep a Engine singleton instance accessible from all the parts of the app. Engine should normally be created once when the app starts and never destroyed until it exists.
  • Make sure the engine.acquire() connection creation is performed in a single or just a few first-level db functions. This first-level functions will have access to the engine singleton.
  • all other functions of the applications that need to access the db should accept a connection instance passed from the first-level db functions, and should not acess engine singleton directly
  • make sure you never call a first-level db function within an engine.acquire() block of another first-level db function

@mpaolini
Copy link
Contributor Author

4.b) use a global registry of connections

No, not an option. Let's never use global objects at all. It's the easiest way to make a mess and break the Universe.

@asvetlov I don't entirely agree with this point.

Using global objects is not only useful but mandatory for using aiopg correctly.

The connection pool itself is only useful when shared by the whole app when running and thus used as a global kind of instance.

The deadlock issue we are discussing happens exacly for this reason: we want a connection pool shared by all asyncio tasks in the running app.

Having this singleton Engine instance as a global variable (like django and pymongo suggest) or as an attribute of the SiteHandler view-manager singleton as in aiohttp demos makes very little difference.

@mpaolini
Copy link
Contributor Author

Yep, that is the source of the problem -- "letting the user keep a global var engine" )

@popravich in order to correctly use the engine's connection pool you have to have a singleton Engine instance accessible from all greenlets or you app. That's the way it is supposed to be, otherwise the pool is meaningless. I think we should document this too.

And yes, I agree that's the source of the problem, because having serialized access to shared resources is difficult and might lead to deadlocks.

I just want to point out that the connection pool is a shared resource by design and you have to keep it a shared to be useful

@mpaolini
Copy link
Contributor Author

Ok I have bothered you guys enugh. Let me know it the main points I proposed are agreed upon, and I will try to cook a patch do the docs in the next few days to even things out ;)

@asvetlov
Copy link
Member

@mpaolini I'll rewrite aiohttp demo to reflect your notes and provide better storage for db pool (app['db'] actually).

@popravich
Copy link
Member

Let me emphasize -- "Shared resource" != "global variable"
There many ways to design application with shared resources and not making them global vars.
I just don't want aiopg to hint users to use global variables.

@mpaolini
Copy link
Contributor Author

should I close this one and create a new issue "Document how to correctly use connection pool" ?

@asvetlov
Copy link
Member

Yes, please do.
I'm working on aio-libs/aiohttp#981 but have no idea when I'll find a time for updating aiopg docs.

@mpaolini would you make a PR for aiopg doc update?

@mpaolini
Copy link
Contributor Author

Closing this for issue #129

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants