Permalink
Browse files

Close the connection on error in callback

Unfortunately PQcancel blocks, so it's not better than PQgetResult.
It has been suggested to use PQreset in non-blocking way but this would give
the Python program the burden of handling a connection done but not configured
in an unexpected place.
  • Loading branch information...
1 parent 2611d62 commit b61a2a34c40e9b6e8774178e553a2e81eb889f06 @dvarrazzo committed Oct 6, 2012
Showing with 69 additions and 61 deletions.
  1. +3 −0 NEWS
  2. +14 −47 psycopg/green.c
  3. +36 −11 sandbox/test_green_error.py
  4. +16 −3 tests/test_green.py
View
3 NEWS
@@ -16,6 +16,9 @@ What's new in psycopg 2.4.6
- Dropped GIL release during string adaptation around a function call
invoking a Python API function, which could cause interpreter crash.
Thanks to Manu Cupcic for the report (ticket #110).
+ - Close a green connection if there is an error in the callback.
+ Maybe a harsh solution but it leaves the program responsive
+ (ticket #113).
- 'register_hstore()', 'register_composite()', 'tpc_recover()' work with
RealDictConnection and Cursor (ticket #114).
- connect() raises an exception instead of swallowing keyword arguments
View
@@ -34,7 +34,7 @@
HIDDEN PyObject *wait_callback = NULL;
static PyObject *have_wait_callback(void);
-static void psyco_panic_cancel(connectionObject *conn);
+static void green_panic(connectionObject *conn);
/* Register a callback function to block waiting for data.
*
@@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
conn->async_status = ASYNC_WRITE;
if (0 != psyco_wait(conn)) {
- psyco_panic_cancel(conn);
+ green_panic(conn);
goto end;
}
@@ -194,52 +194,19 @@ psyco_exec_green(connectionObject *conn, const char *command)
/* There has been a communication error during query execution. It may have
* happened e.g. for a network error or an error in the callback, and we
- * cannot tell the two apart. The strategy here to avoid blocking (issue #113)
- * is to try and cancel the query, waiting for the result in non-blocking way.
- * If again we receive an error, we raise an error and close the connection.
- * Discard the result of the currenly executed query, blocking.
+ * cannot tell the two apart.
+ * Trying to PQcancel or PQgetResult to put the connection back into a working
+ * state doesn't work nice (issue #113): the program blocks and the
+ * interpreter won't even respond to SIGINT. PQreset could work async, but the
+ * python program would have then a connection made but not configured where
+ * it is probably not designed to handled. So for the moment we do the kindest
+ * thing we can: we close the connection. A long-running program should
+ * already have a way to discard broken connections; a short-lived one would
+ * benefit of working ctrl-c.
*/
static void
-psyco_panic_cancel(connectionObject *conn)
+green_panic(connectionObject *conn)
{
- PGresult *res;
- PyObject *etype, *evalue, *etb;
- char errbuf[256];
-
- /* we should have an exception set. */
- PyErr_Fetch(&etype, &evalue, &etb);
- if (NULL == etype) {
- Dprintf("panic_cancel: called without exception set");
- }
-
- /* Try sending the cancel signal */
- Dprintf("panic_cancel: sending cancel request");
- if (PQcancel(conn->cancel, errbuf, sizeof(errbuf)) == 0) {
- Dprintf("panic_cancel: canceling failed: %s", errbuf);
- /* raise a warning: we'll keep the previous error */
- PyErr_WarnEx(NULL, errbuf, 1);
- goto exit;
- }
-
- /* go back in the loop for another attempt at async processing */
- /* TODO: should we start on ASYNC_WRITE instead? */
- if (0 != psyco_wait(conn)) {
- Dprintf("panic_cancel: error after cancel: closing the connection");
- PyErr_WarnEx(NULL, "async cancel failed: closing the connection", 1);
- conn_close_locked(conn);
- goto exit;
- }
-
- /* we must clear the result or we get "another command is already in
- * progress" */
- if (NULL != (res = pq_get_last_result(conn))) {
- PQclear(res);
- }
-
-exit:
- /* restore the exception. If no exception was set at function begin, don't
- * clobber one that may have been set here. */
- if (etype) {
- PyErr_Restore(etype, evalue, etb);
- }
+ Dprintf("green_panic: closing the connection");
+ conn_close_locked(conn);
}
@@ -4,15 +4,20 @@
DSN = 'dbname=test'
-# import eventlet.patcher
-# eventlet.patcher.monkey_patch()
+import eventlet.patcher
+eventlet.patcher.monkey_patch()
import os
import signal
+from time import sleep
+
import psycopg2
from psycopg2 import extensions
from eventlet.hubs import trampoline
+
+# register a test wait callback that fails if SIGHUP is received
+
panic = []
def wait_cb(conn):
@@ -34,23 +39,43 @@ def wait_cb(conn):
extensions.set_wait_callback(wait_cb)
+
+# SIGHUP handler to inject a fail in the callback
+
def handler(signum, frame):
panic.append(True)
signal.signal(signal.SIGHUP, handler)
+
+# Simulate another green thread working
+
+def worker():
+ while 1:
+ print "I'm working"
+ sleep(1)
+
+eventlet.spawn(worker)
+
+
+# You can unplug the network cable etc. here.
+# Kill -HUP will raise an exception in the callback.
+
+print "PID", os.getpid()
conn = psycopg2.connect(DSN)
curs = conn.cursor()
-print "PID", os.getpid()
try:
- curs.execute("select pg_sleep(1000)")
+ for i in range(1000):
+ curs.execute("select %s, pg_sleep(1)", (i,))
+ r = curs.fetchone()
+ print "selected", r
+
except BaseException, e:
print "got exception:", e.__class__.__name__, e
-conn.rollback()
-curs.execute("select 1")
-print curs.fetchone()
-
-
-# You can unplug the network cable etc. here.
-# Kill -HUP will raise an exception in the callback.
+if conn.closed:
+ print "the connection is closed"
+else:
+ conn.rollback()
+ curs.execute("select 1")
+ print curs.fetchone()
View
@@ -79,6 +79,9 @@ def test_flush_on_write(self):
warnings.warn("sending a large query didn't trigger block on write.")
def test_error_in_callback(self):
+ # behaviour changed after issue #113: if there is an error in the
+ # callback for the moment we don't have a way to reset the connection
+ # without blocking (ticket #113) so just close it.
conn = self.conn
curs = conn.cursor()
curs.execute("select 1") # have a BEGIN
@@ -88,11 +91,21 @@ def test_error_in_callback(self):
psycopg2.extensions.set_wait_callback(lambda conn: 1//0)
self.assertRaises(ZeroDivisionError, curs.execute, "select 2")
+ self.assert_(conn.closed)
+
+ def test_dont_freak_out(self):
+ # if there is an error in a green query, don't freak out and close
+ # the connection
+ conn = self.conn
+ curs = conn.cursor()
+ self.assertRaises(psycopg2.ProgrammingError,
+ curs.execute, "select the unselectable")
+
# check that the connection is left in an usable state
- psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
+ self.assert_(not conn.closed)
conn.rollback()
- curs.execute("select 2")
- self.assertEqual(2, curs.fetchone()[0])
+ curs.execute("select 1")
+ self.assertEqual(curs.fetchone()[0], 1)
def test_suite():

0 comments on commit b61a2a3

Please sign in to comment.