Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

The future returned by Connection.execute will never resolve #140

Open
masknu opened this issue Apr 20, 2016 · 1 comment
Open

The future returned by Connection.execute will never resolve #140

masknu opened this issue Apr 20, 2016 · 1 comment

Comments

@masknu
Copy link

masknu commented Apr 20, 2016

Close pool while invoking Connection.execute, the future returned by Connection.execute will never resolve.

Anyone can reproduce this issue by doing this:

conn = yield self.db.getconn(False)
yield conn.execute("BEGIN;")
cursor = yield conn.execute(" \
                    DECLARE serv_cur CURSOR FOR \
                    SELECT * FROM some_table;")
cursor_future = conn.execute("FETCH 2000 FROM serv_cur;")
#at some where of our program, 
#we close the connection through Pool.close() or just conn.close() like this:
conn.close()
#then our program will stuck at following line forever
cursor = yield cursor_future
result = cursor.fetchall()
yield conn.execute("CLOSE serv_cur;")
yield conn.execute("COMMIT;")

The possible solution

In Connection.execute(),Connection.callproc() and Connection.connect()

def execute(self,
                operation,
                parameters=(),
                cursor_factory=None):
    #......other code
    #before return future, save the last one
    assert self.current_future is None, "can not concurrently execute on same connection"
    self.current_future = future
    return future

In Connection._io_callback()

def _io_callback(self, future, result, fd=None, events=None):
    try:
        state = self.connection.poll()
    except (psycopg2.Warning, psycopg2.Error) as error:
        self.ioloop.remove_handler(self.fileno)
        future.set_exc_info(sys.exc_info())
        #clear last future
        self.current_future = None
    else:
        try:
            if state == POLL_OK:
                self.ioloop.remove_handler(self.fileno)
                future.set_result(result)
                #clear last future
                self.current_future = None
            elif state == POLL_READ:
                self.ioloop.update_handler(self.fileno, IOLoop.READ)
            elif state == POLL_WRITE:
                self.ioloop.update_handler(self.fileno, IOLoop.WRITE)
            else:
                future.set_exception(psycopg2.OperationalError("poll() returned %s" % state))
                #clear last future
                self.current_future = None
        except IOError:
            # Can happen when there are quite a lof of outstanding
            # requests. See https://github.com/FSX/momoko/issues/127
            self.ioloop.remove_handler(self.fileno)
            future.set_exception(psycopg2.OperationalError("IOError on socker"))
            #clear last future
            self.current_future = None

In Connection.close()

def close(self):
    if not self.closed:
        self.connection.close()
        if self.current_future:
            self.current_future.set_exception(psycopg2.OperationalError("database connection disconnected"))
            self.current_future = None
@haizaar
Copy link
Collaborator

haizaar commented Jul 18, 2016

Unittest reproduction: patch.txt

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants