Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Bug fix for large queries #29

Closed
wants to merge 3 commits into from

2 participants

@stevearc

Commit comments say it all. There's two commits here, one to fix the broken behavior on large queries, and one to handle database connections going down temporarily. If you don't like the implementation feel free to decline the pull request and make the changes however you see fit, but I thought I'd pass on this fix in the hope of preventing other people from running into the same issue.

stevearc added some commits
@stevearc stevearc Catch exceptions from cursor polling
This allows momoko to recover connections to a database if the db goes
down.
e4bbd17
@stevearc stevearc Hackish bug fix for hanging connections on large queries
If the query is large, psycopg2 sets async status to WRITE and requires
additional calls to poll() to flush the query to postgresql.  Momoko was
telling tornado to only listen for READ, so poll() was never called and
the query would hang
fbc8b89
@FSX
Owner
FSX commented

I have something similar implemented in the rewrite in connection.py#L248 and for reconnectiong at connection.py#L171.

What happens when the connection is closed, is the callback still executed?

@stevearc

Cool! Wish I'd seen that earlier. In my hackish solution for the connection issue, the callbacks are not called. So if we get a tornado request while the db is down it just hangs and timeouts. Which is...acceptable for now, but I'm looking forward to the rewrite!

Looking at the rewrite, I'm not certain that the issue with the large query hang has been fixed. The original problem was that _io_callback was never being called by tornado. I would attempt an execute, which would call AsyncConnection.cursor() and that would do cursor.execute() as well as call ioloop.update_handler(fd, IOLoop.READ). And in the case of a large query, psycopg2 sets its internal async_status as WRITE to indicate that it requires additional calls to poll() to flush the query to the database. So after the call to cursor(), _io_callback is never called, which means cursor.poll() is never called, and the query is never flushed to the database.

With the code in the rewrite, it seems like inside of _do_op you're calling execute and then calling set_callbacks, which sets the update_handler to IOLoop.READ just like before.

Again, my knowledge of how epoll works is pretty shallow, so I could be totally wrong about how this is working. Note that if you want to test this you will need queries in excess of 60000 characters AND you will need the database to be on a different machine. I was never able to reproduce it locally.

@FSX
Owner
FSX commented

60000 characters? Another machine is not a problem for me, but I can't imagina a query of 60000 characters. What kind of queries are you running?

@stevearc

Yeah...we're uh...we're inserting a lot of json into the db. I have a simple repro of the bug. Create a basic table in Postgresql with a text field.

CREATE TABLE test_table (my_text TEXT);

Then

from tornado.ioloop import IOLoop
from tornado import gen
import momoko

QUERY_SIZE = 60000

@gen.engine
def do_query():
    rwdb = momoko.AsyncClient({
        "host": "my-host",
        "port": 5432,
        "user": "my-user",
        "password": "my-pass",
        "database": "my-db",
        "min_conn": 1,
        "max_conn": 1,
        "cleanup_timeout": 10
    })
    rwdb.execute("INSERT INTO test_table (my_text) VALUES (%s)", ("A" * QUERY_SIZE,),\
                callback=(yield gen.Callback('test_insert')))
    cur = yield gen.Wait('test_insert')
    print cur.statusmessage
    IOLoop.instance().stop()

def main():
    ioloop = IOLoop.instance()
    ioloop.add_callback(do_query)
    ioloop.start()

if __name__ == "__main__":
    main()

That will hang forever. You can tweak the QUERY_SIZE to see that it functions normally at smaller values.

@FSX
Owner

Thanks! I'll test it with the current Momoko and with the rewrite.

I'll leave the pull request open until I've confirmed it.

@stevearc stevearc Fixed a bug where chains could exceed maximum recursion limit
Bug was introduced in my attempted bug fix for large queries.
383e3d8
@FSX FSX referenced this pull request from a commit
@FSX Fixed a bug for very large queries. Found by @stevearc.
When a query exceeds 60000 characters, it keeps hanging and will not be executed.

See pull request #29 for more details.
ee354c0
@FSX
Owner

Sorry for the delay. I fixed it in ee354c0, but only in the rewrite. I', focusing on that, because I need to use it in an internship project in some weeks.

I did not use a database on an other machine, but it kept hanging like you described when the query exceeded 60000 characters. Can you confirm it works? If you want to try out the rewrite of course. :)

@stevearc

Tested the rewrite, and it looks like it's working!

@stevearc stevearc closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 7, 2012
  1. @stevearc

    Catch exceptions from cursor polling

    stevearc authored
    This allows momoko to recover connections to a database if the db goes
    down.
  2. @stevearc

    Hackish bug fix for hanging connections on large queries

    stevearc authored
    If the query is large, psycopg2 sets async status to WRITE and requires
    additional calls to poll() to flush the query to postgresql.  Momoko was
    telling tornado to only listen for READ, so poll() was never called and
    the query would hang
Commits on Sep 10, 2012
  1. @stevearc

    Fixed a bug where chains could exceed maximum recursion limit

    stevearc authored
    Bug was introduced in my attempted bug fix for large queries.
This page is out of date. Refresh to see the latest.
Showing with 8 additions and 4 deletions.
  1. +8 −4 momoko/pools.py
View
12 momoko/pools.py
@@ -318,15 +318,19 @@ def cursor(self, function, function_args, callback, cursor_kwargs={}):
getattr(cursor, function)(*function_args)
self._callbacks = [partial(callback, cursor)]
- # Connection state should be 1 (write)
- self._ioloop.update_handler(self._fileno, IOLoop.READ)
+ # Fire callback immediately To correctly set the update handler
+ self._io_callback(None, None)
def _io_callback(self, fd, events):
- state = self._conn.poll()
+ try:
+ state = self._conn.poll()
+ except psycopg2.OperationalError:
+ self.close()
+ return
if state == psycopg2.extensions.POLL_OK:
for callback in self._callbacks:
- callback()
+ self._ioloop.add_callback(callback)
elif state == psycopg2.extensions.POLL_READ:
self._ioloop.update_handler(self._fileno, IOLoop.READ)
elif state == psycopg2.extensions.POLL_WRITE:
Something went wrong with that request. Please try again.