Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add locking

  • Loading branch information...
commit 596f14b47c1b6edeeb2c0100eb778dabe8353ad4 1 parent b4d1e0d
@mvantellingen mvantellingen authored
Showing with 90 additions and 73 deletions.
  1. +49 −38 psycopg2ct/_impl/connection.py
  2. +41 −35 psycopg2ct/_impl/cursor.py
View
87 psycopg2ct/_impl/connection.py
@@ -1,3 +1,4 @@
+import threading
import weakref
from functools import wraps
@@ -95,6 +96,7 @@ def __init__(self, dsn, async=False):
self._autocommit = False
self._pgconn = None
self._equote = False
+ self._lock = threading.RLock()
self.notices = []
# The number of commits/rollbacks done so far
@@ -165,24 +167,24 @@ def commit(self):
@check_closed
@check_async
def reset(self):
- self._execute_command(
- "ABORT; RESET ALL; SET SESSION AUTHORIZATION DEFAULT;")
- self.status = consts.STATUS_READY
- self._mark += 1
- self._autocommit = False
- self._tpc_xid = None
+ with self._lock:
+ self._execute_command(
+ "ABORT; RESET ALL; SET SESSION AUTHORIZATION DEFAULT;")
+ self.status = consts.STATUS_READY
+ self._mark += 1
+ self._autocommit = False
+ self._tpc_xid = None
def _get_guc(self, name):
"""Return the value of a configuration parameter."""
- pgres = libpq.PQexec(self._pgconn, 'SHOW %s' % name)
- if not pgres or libpq.PQresultStatus(pgres) != libpq.PGRES_TUPLES_OK:
- raise exceptions.OperationalError(
- "can't fetch %s" % name)
-
- rv = libpq.PQgetvalue(pgres, 0, 0)
- libpq.PQclear(pgres)
+ with self._lock:
+ pgres = libpq.PQexec(self._pgconn, 'SHOW %s' % name)
+ if not pgres or libpq.PQresultStatus(pgres) != libpq.PGRES_TUPLES_OK:
+ raise exceptions.OperationalError("can't fetch %s" % name)
- return rv
+ rv = libpq.PQgetvalue(pgres, 0, 0)
+ libpq.PQclear(pgres)
+ return rv
def _set_guc(self, name, value):
"""Set the value of a configuration parameter."""
@@ -199,7 +201,6 @@ def _set_guc_onoff(self, name, value):
value = 'default'
else:
value = value and 'on' or 'off'
-
self._set_guc(name, value)
@property
@@ -302,7 +303,6 @@ def cursor(self, name=None, cursor_factory=Cursor, withhold=False):
@check_tpc
def cancel(self):
errbuf = libpq.create_string_buffer(256)
-
if libpq.PQcancel(self._cancel, errbuf, len(errbuf)) == 0:
raise self._create_exception(msg=errbuf)
@@ -567,8 +567,15 @@ def _setup(self):
if self._cancel is None:
raise exceptions.OperationalError("can't get cancellation key")
- self._closed = False
- self.status = consts.STATUS_READY
+ with self._lock:
+ # If the current datestyle is not compatible (not ISO) then
+ # force it to ISO
+ datestyle = libpq.PQparameterStatus(self._pgconn, 'DateStyle')
+ if not datestyle or not datestyle.startswith('ISO'):
+ self.status = consts.STATUS_DATESTYLE
+ self._set_guc('datestyle', 'ISO')
+
+ self._closed = False
def _begin_transaction(self):
if self.status == consts.STATUS_READY and not self._autocommit:
@@ -576,15 +583,16 @@ def _begin_transaction(self):
self.status = consts.STATUS_BEGIN
def _execute_command(self, command):
- pgres = libpq.PQexec(self._pgconn, command)
- if not pgres:
- raise self._create_exception()
- try:
- pgstatus = libpq.PQresultStatus(pgres)
- if pgstatus != libpq.PGRES_COMMAND_OK:
- raise self._create_exception(pgres=pgres)
- finally:
- libpq.PQclear(pgres)
+ with self._lock:
+ pgres = libpq.PQexec(self._pgconn, command)
+ if not pgres:
+ raise self._create_exception()
+ try:
+ pgstatus = libpq.PQresultStatus(pgres)
+ if pgstatus != libpq.PGRES_COMMAND_OK:
+ raise self._create_exception(pgres=pgres)
+ finally:
+ libpq.PQclear(pgres)
def _execute_tpc_command(self, command, xid):
cmd = '%s %s' % (command, util.quote_string(self, str(xid)))
@@ -634,11 +642,13 @@ def _close(self):
def _commit(self):
if self._autocommit or self.status != consts.STATUS_BEGIN:
return
- self._mark += 1
- try:
- self._execute_command('COMMIT')
- finally:
- self.status = consts.STATUS_READY
+
+ with self._lock:
+ self._mark += 1
+ try:
+ self._execute_command('COMMIT')
+ finally:
+ self.status = consts.STATUS_READY
def _rollback(self):
if self._autocommit or self.status != consts.STATUS_BEGIN:
@@ -659,12 +669,13 @@ def _get_equote(self):
return ret and ret == 'off'
def _is_busy(self):
- if libpq.PQconsumeInput(self._pgconn) == 0:
- raise exceptions.OperationalError(
- libpq.PQerrorMessage(self._pgconn))
- res = libpq.PQisBusy(self._pgconn)
- self._process_notifies()
- return res
+ with self._lock:
+ if libpq.PQconsumeInput(self._pgconn) == 0:
+ raise exceptions.OperationalError(
+ libpq.PQerrorMessage(self._pgconn))
+ res = libpq.PQisBusy(self._pgconn)
+ self._process_notifies()
+ return res
def _process_notice(self, arg, message):
"""Store the given message in `self.notices`
View
76 psycopg2ct/_impl/cursor.py
@@ -639,30 +639,32 @@ def _clear_pgres(self):
def _pq_execute(self, query, async=False):
pgconn = self._conn._pgconn
if not async:
- self._pgres = libpq.PQexec(pgconn, query)
- if not self._pgres:
- raise self._conn._create_exception(pgres=self._pgres)
- self._conn._process_notifies()
+ with self._conn._lock:
+ self._pgres = libpq.PQexec(pgconn, query)
+ if not self._pgres:
+ raise self._conn._create_exception(pgres=self._pgres)
+ self._conn._process_notifies()
self._pq_fetch()
else:
- ret = libpq.PQsendQuery(pgconn, query)
- if not ret:
-
- # XXX: check if this is correct, seems like a hack.
- # but the test_async_after_async expects it.
- if self._conn._async_cursor:
- raise ProgrammingError(
- 'cannot be used while an asynchronous query is underway')
-
- raise self._conn._create_exception()
-
- ret = libpq.PQflush(pgconn)
- if ret == 0:
- async_status = consts.ASYNC_READ
- elif ret == 1:
- async_status = consts.ASYNC_WRITE
- else:
- raise ValueError() # XXX
+ with self._conn._lock:
+ ret = libpq.PQsendQuery(pgconn, query)
+ if not ret:
+
+ # XXX: check if this is correct, seems like a hack.
+ # but the test_async_after_async expects it.
+ if self._conn._async_cursor:
+ raise ProgrammingError(
+ 'cannot be used while an asynchronous query is underway')
+
+ raise self._conn._create_exception()
+
+ ret = libpq.PQflush(pgconn)
+ if ret == 0:
+ async_status = consts.ASYNC_READ
+ elif ret == 1:
+ async_status = consts.ASYNC_WRITE
+ else:
+ raise ValueError() # XXX
self._conn._async_status = async_status
self._conn._async_cursor = weakref.ref(self)
@@ -685,8 +687,24 @@ def _pq_fetch(self):
elif pgstatus == libpq.PGRES_TUPLES_OK:
self._rowcount = libpq.PQntuples(self._pgres)
- self._no_tuples = False
+ return self._pq_fetch_tuples()
+
+ elif pgstatus == libpq.PGRES_COPY_IN:
+ return self._pq_fetch_copy_in()
+
+ elif pgstatus == libpq.PGRES_COPY_OUT:
+ return self._pq_fetch_copy_out()
+
+ elif pgstatus == libpq.PGRES_EMPTY_QUERY:
+ raise ProgrammingError("can't execute an empty query")
+
+ else:
+ raise self._conn._create_exception(pgres=self._pgres)
+
+ def _pq_fetch_tuples(self):
+ with self._conn._lock:
self._nfields = libpq.PQnfields(self._pgres)
+ self._no_tuples = False
description = []
casts = []
for i in xrange(self._nfields):
@@ -724,18 +742,6 @@ def _pq_fetch(self):
self._description = tuple(description)
self._casts = casts
- elif pgstatus == libpq.PGRES_COPY_IN:
- return self._pq_fetch_copy_in()
-
- elif pgstatus == libpq.PGRES_COPY_OUT:
- return self._pq_fetch_copy_out()
-
- elif pgstatus == libpq.PGRES_EMPTY_QUERY:
- raise ProgrammingError("can't execute an empty query")
-
- else:
- raise self._conn._create_exception(pgres=self._pgres)
-
def _pq_fetch_copy_in(self):
pgconn = self._conn._pgconn
size = self._copysize
Please sign in to comment.
Something went wrong with that request. Please try again.