Skip to content

Commit

Permalink
Support asynchronous command processing (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
Cito committed Jun 19, 2020
1 parent 78d336a commit 5b57de3
Show file tree
Hide file tree
Showing 10 changed files with 813 additions and 177 deletions.
6 changes: 5 additions & 1 deletion docs/contents/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Version 5.2 (to be released)
of the pqlib used by PyGreSQL (needs PostgreSQL >= 9.1 on the client).
- New query method `memsize()` that gets the memory size allocated by
the query (needs PostgreSQL >= 12 on the client).
- Experimental support for asynchronous command processing.
Additional connection parameter ``nowait``, and connection methods
`send_query()`, `poll()`, `set_non_blocking()`, `is_non_blocking()`.
Generously contributed by Patrick TJ McPhee (#19).

- Changes to the DB-API 2 module (pgdb):
- When using Python 2, errors are now derived from StandardError
Expand All @@ -19,7 +23,7 @@ Version 5.2 (to be released)
- The `types` parameter of `format_query` can now be passed as a string
that will be split on whitespace when values are passed as a sequence,
and the types can now also be specified using actual Python types
instead of type names (#38, suggested by Justin Pryzby).
instead of type names. Suggested by Justin Pryzby (#38).

Version 5.1.2 (2020-04-19)
--------------------------
Expand Down
180 changes: 180 additions & 0 deletions docs/contents/pg/connection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,102 @@ Example::
phone = con.query("select phone from employees where name=$1",
(name,)).getresult()


send_query - executes a SQL command string asynchronously
---------------------------------------------------------

.. method:: Connection.send_query(command, [args])

Submits a command to the server without waiting for the result(s).

:param str command: SQL command
:param args: optional parameter values
:returns: a query object, as described below
:rtype: :class:`Query`
:raises TypeError: bad argument type, or too many arguments
:raises TypeError: invalid connection
:raises ValueError: empty SQL query or lost connection
:raises pg.ProgrammingError: error in query

This method is much the same as :meth:`Connection.query`, except that it
returns without waiting for the query to complete. The database connection
cannot be used for other operations until the query completes, but the
application can do other things, including executing queries using other
database connections. The application can call ``select()`` using the
``fileno``` obtained by the connection#s :meth:`Connection.fileno` method
to determine when the query has results to return.

This method always returns a :class:`Query` object. This object differs
from the :class:`Query` object returned by :meth:`Connection.query` in a
few way. Most importantly, when :meth:`Connection.send_query` is used, the
application must call one of the result-returning methods such as
:meth:`Query.getresult` or :meth:`Query.dictresult` until it either raises
an exception or returns ``None``.

Otherwise, the database connection will be left in an unusable state.

In cases when :meth:`Connection.query` would return something other than
a :class:`Query` object, that result will be returned by calling one of
the result-returning methods on the :class:`Query` object returned by
:meth:`Connection.send_query`. There's one important difference in these
result codes: if :meth:`Connection.query` returns `None`, the result-returning
methods will return an empty string (`''`). It's still necessary to call a
result-returning method until it returns `None`.

:meth:`Query.listfields`, :meth:`Query.fieldname`, :meth:`Query.fieldnum`,
and :meth:`Query.ntuples` only work after a call to a result-returning method
with a non-`None` return value. :meth:`Query.ntuples` returns only the number
of rows returned by the previous result-returning method.

If multiple semi-colon-delimited statements are passed to
:meth:`Connection.query`, only the results of the last statement are returned
in the :class:`Query` object. With :meth:`Connection.send_query`, all results
are returned. Each result set will be returned by a separate call to
:meth:`Query.getresult()` or other result-returning methods.

.. versionadded:: 5.2

Examples::

name = input("Name? ")
query = con.send_query("select phone from employees where name=$1",
(name,))
phone = query.getresult()
query.getresult() # to close the query

# Run two queries in one round trip:
# (Note that you cannot use a union here
# when the result sets have different row types.)
query = con.send_query("select a,b,c from x where d=e;
"select e,f from y where g")
result_x = query.dictresult()
result_y = query.dictresult()
query.dictresult() # to close the query

# Using select() to wait for the query to be ready:
query = con.send_query("select pg_sleep(20)")
r, w, e = select([con.fileno(), other, sockets], [], [])
if con.fileno() in r:
results = query.getresult()
query.getresult() # to close the query

# Concurrent queries on separate connections:
con1 = connect()
con2 = connect()
s = con1.query("begin; set transaction isolation level repeatable read;"
"select pg_export_snapshot();").getresult()[0][0]
con2.query("begin; set transaction isolation level repeatable read;"
"set transaction snapshot '%s'" % (s,))
q1 = con1.send_query("select a,b,c from x where d=e")
q2 = con2.send_query("select e,f from y where g")
r1 = q1.getresult()
q1.getresult()
r2 = q2.getresult()
q2.getresult()
con1.query("commit")
con2.query("commit")


query_prepared -- execute a prepared statement
----------------------------------------------

Expand Down Expand Up @@ -169,6 +265,56 @@ reset -- reset the connection

This method resets the current database connection.

poll - completes an asynchronous connection
-------------------------------------------

.. method:: Connection.poll()

Complete an asynchronous :mod:`pg` connection and get its state

:returns: state of the connection
:rtype: int
:raises TypeError: too many (any) arguments
:raises TypeError: invalid connection
:raises pg.InternalError: some error occurred during pg connection

The database connection can be performed without any blocking calls.
This allows the application mainline to perform other operations or perhaps
connect to multiple databases concurrently. Once the connection is established,
it's no different from a connection made using blocking calls.

The required steps are to pass the parameter ``nowait=True`` to the
:meth:`pg.connect` call, then call :meth:`Connection.poll` until it either
returns :const':`POLLING_OK` or raises an exception. To avoid blocking
in :meth:`Connection.poll`, use `select()` or `poll()` to wait for the
connection to be readable or writable, depending on the return code of the
previous call to :meth:`Connection.poll`. The initial state of the connection
is :const:`POLLING_WRITING`. The possible states are defined as constants in
the :mod:`pg` module (:const:`POLLING_OK`, :const:`POLLING_FAILED`,
:const:`POLLING_READING` and :const:`POLLING_WRITING`).

.. versionadded:: 5.2

Example::

con = pg.connect('testdb', nowait=True)
fileno = con.fileno()
rd = []
wt = [fileno]
rc = pg.POLLING_WRITING
while rc not in (pg.POLLING_OK, pg.POLLING_FAILED):
ra, wa, xa = select(rd, wt, [], timeout)
if not ra and not wa:
timedout()
rc = con.poll()
if rc == pg.POLLING_READING:
rd = [fileno]
wt = []
else:
rd = []
wt = [fileno]


cancel -- abandon processing of current SQL command
---------------------------------------------------

Expand Down Expand Up @@ -281,6 +427,40 @@ fileno -- get the socket used to connect to the database
This method returns the underlying socket id used to connect
to the database. This is useful for use in select calls, etc.

set_non_blocking - set the non-blocking status of the connection
----------------------------------------------------------------

.. method:: set_non_blocking(nb)

Set the non-blocking mode of the connection

:param bool nb: True to put the connection into non-blocking mode.
False to put it into blocking mode.
:raises TypeError: too many parameters
:raises TypeError: invalid connection

Puts the socket connection into non-blocking mode or into blocking mode.
This affects copy commands and large object operations, but not queries.

.. versionadded:: 5.2

is_non_blocking - report the blocking status of the connection
--------------------------------------------------------------

.. method:: is_non_blocking()

get the non-blocking mode of the connection

:returns: True if the connection is in non-blocking mode.
False if it is in blocking mode.
:rtype: bool
:raises TypeError: too many parameters
:raises TypeError: invalid connection

Returns True if the connection is in non-blocking mode, False otherwise.

.. versionadded:: 5.2

getnotify -- get the last notify from the server
------------------------------------------------

Expand Down
15 changes: 14 additions & 1 deletion docs/contents/pg/module.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ standard environment variables should be used.
connect -- Open a PostgreSQL connection
---------------------------------------

.. function:: connect([dbname], [host], [port], [opt], [user], [passwd])
.. function:: connect([dbname], [host], [port], [opt], [user], [passwd], [nowait])

Open a :mod:`pg` connection

Expand All @@ -36,6 +36,8 @@ connect -- Open a PostgreSQL connection
:type user: str or None
:param passwd: password for user (*None* = :data:`defpasswd`)
:type passwd: str or None
:param nowait: whether the connection should happen asynchronously
:type nowait: bool
:returns: If successful, the :class:`Connection` handling the connection
:rtype: :class:`Connection`
:raises TypeError: bad argument type, or too many arguments
Expand All @@ -49,11 +51,15 @@ Python tutorial. The names of the keywords are the name of the
parameters given in the syntax line. The ``opt`` parameter can be used
to pass command-line options to the server. For a precise description
of the parameters, please refer to the PostgreSQL user manual.
See :meth:`Connection.poll` for a description of the ``nowait`` parameter.

If you want to add additional parameters not specified here, you must
pass a connection string or a connection URI instead of the ``dbname``
(as in ``con3`` and ``con4`` in the following example).

.. versionchanged:: 5.2
Support for asynchronous connections via the ``nowait`` parameter.

Example::

import pg
Expand Down Expand Up @@ -747,6 +753,13 @@ for more information about them. These constants are:
large objects access modes,
used by :meth:`Connection.locreate` and :meth:`LargeObject.open`

.. data:: POLLING_OK
.. data:: POLLING_FAILED
.. data:: POLLING_READING
.. data:: POLLING_WRITING

polling states, returned by :meth:`Connection.poll`

.. data:: SEEK_SET
.. data:: SEEK_CUR
.. data:: SEEK_END
Expand Down
9 changes: 9 additions & 0 deletions docs/contents/pg/query.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ You can also call :func:`len` on a query to find the number of rows
in the result, and access row tuples using their index directly on
the :class:`Query` object.

When the :class:`Query` object was returned by :meth:`Connection.send_query`,
other return values are also possible, as documented there.

dictresult/dictiter -- get query values as dictionaries
-------------------------------------------------------

Expand Down Expand Up @@ -81,6 +84,9 @@ fetched from the server anyway when the query is executed.
If the query has duplicate field names, you will get the value for the
field with the highest index in the query.

When the :class:`Query` object was returned by :meth:`Connection.send_query`,
other return values are also possible, as documented there.

.. versionadded:: 5.1

namedresult/namediter -- get query values as named tuples
Expand Down Expand Up @@ -127,6 +133,9 @@ Column names in the database that are not valid as field names for
named tuples (particularly, names starting with an underscore) are
automatically renamed to valid positional names.

When the :class:`Query` object was returned by :meth:`Connection.send_query`,
other return values are also possible, as documented there.

.. versionadded:: 5.1

scalarresult/scalariter -- get query values as scalars
Expand Down
1 change: 1 addition & 0 deletions pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
'NoResultError', 'NotSupportedError',
'OperationalError', 'ProgrammingError',
'INV_READ', 'INV_WRITE',
'POLLING_OK', 'POLLING_FAILED', 'POLLING_READING', 'POLLING_WRITING',
'SEEK_CUR', 'SEEK_END', 'SEEK_SET',
'TRANS_ACTIVE', 'TRANS_IDLE', 'TRANS_INERROR',
'TRANS_INTRANS', 'TRANS_UNKNOWN',
Expand Down

0 comments on commit 5b57de3

Please sign in to comment.