PHP level API for PGQ PostgreSQL Queing solution
PHP Makefile PLpgSQL
Latest commit 0535a38 Jun 22, 2015 @dimitri Merge pull request #7 from kachar/interactive-consumer-fixes
Fix InteractiveConsumer costructor arguments and finish_batch method
Failed to load latest commit information.
debian Update debian package to 0.15, wit the latest fix. Apr 18, 2012
.gitignore New package release, 0.11. Oct 31, 2011
PGQConsumer.php Remove PGQConsumer::get_consumers() unecessary arguments (fix warning… Nov 10, 2011
PGQCoopConsumer.php Fix the fix (subconsumer naming) Apr 18, 2012
PGQInteractiveConsumer.php Update PGQInteractiveConsumer.php Jun 22, 2015
PGQRemoteConsumer.php M-x delete-trailing-whitespace Aug 3, 2011
README Some minimal docs about PGQCoopConsumer Feb 8, 2012
SimpleLogger.php Fix simple logger streams detection; use is_resource() instead of fst… Dec 21, 2011
SystemDaemon.php Fixed typo in warning message on USR2 signal Jun 22, 2015



== Introduction

This project provides PHP level API to use the PGQ SQL API.

Read more about PGQ here:[][]

It then provides several kinds of Consumer that you'll want to
extends. All the consumer wants you to implement your own
process_event(&$event) callback function, and will care about PGQ
inner fonctionning.

PGQ is about producing events in a (work) queue and having one or more
consumer get those events, in batches, and work on them. What the PHP
class to be presented here do is fetching the batches and calling your
own process_event() function for each event.

== Tools

=== SimpleLogger

This class handles logging with a +loglevel+: only messages more
important than current +loglevel+ are written to the logfile. The
levels are, by increasing order of importance:


The +SimpleLogger+ offers a method for each +loglevel+, taking a
format string then arguments, just like +sprintf()+ does. You have for

  $logger->debug("a message with a '%s' string", $str);

=== SystemDaemon

This class implements a System Daemon controlled with commands. The
principle is to run in the background and loop forever, taking a
+$this->delay+ rest whenever possible between two loops.

+SystemDaemon+ uses SimpleLogger in +$this->log+ instance, and have it
reopens its logfile at +reload+ time. This means it plays well with
logrotate, just don't forget to +reload+ the daemon at logrotate
postprocess time.

The commands available for any +SystemDaemon+ are: +start+, +stop+,
+kill+, +reload+, +restart+, +status+, +logless+, +logmore+.

The difference between +stop+ and +kill+ is that in the first case the
daemon will terminate current processing and only stops at next
opportunity to sleep for +$this->delay+, while +kill+ forces it into
exiting (calling +$this->stop()+) first.

The +reload+ command will have the daemon call user defined
+$this->reload()+ command at next sleep opportunity. This command
could e.g. read a configuration file and change settings.

Commands +logless+ and +logmore+ are considered without delay and
makes the used logger to get instantly more or less verbose by
increasing or decreasing its +loglevel+. This is useful when you want
to know exactly what a daemon is doing now, but can't afford to
restart it.

When implementing a +SystemDaemon+, you get to just implement
+$this->config()+ and +$this->process()+ functions, which will get
called in the infinite looping. The former will only get called when a
reload has been ordered (+SIGHUP+), and the latter in each and every

The +SystemDaemon+ will also register its +SimpleLogger+ instance as
the PHP error handler, and consider quitting when confronted to a PHP
FATAL errors (one of +E_STRICT+, +E_PARSE+, +E_CORE_ERROR+,
call user defined +$this->php_error_hook()+ for PHP +E_ERROR+ or
+E_USER_ERROR+ level errors.

=== PGQ

This class is abstract and has a +public static+ method for each SQL
function that the PGQ SQL API offers. If PHP knew about namespaces or
modules, this would be a module.

== Consumers

Those class are the one handling how PGQ really works. They will get
batches from the queue, get events from them, allow you to process the
event by calling user defined +$this->process_event(&$event)+
function, and call finish_batch() as appropriate.

The +$this->process_event()+ function should return one of those
return codes:


  When the event processing is satisfactory.


  The event is tagged as failed with +$event->failed_reason+ reason,
  and when using +PGQEventRemoteConsumer+ the related processing done
  on remote database is rollbacked.


  The event is tagged as retry and will get reinserted into main queue
  after a minimum delay of +$event->retry_delay+ seconds. When using
  +PGQEventRemoteConsumer+ the related processing done on the remote
  database is rollbacked.


  All current batch processing is canceled, at both remote and queue
  database when using some sort of +RemoteConsumer+. The batch is not
  finished, so you'll get the events back at next run(s).

=== PGQConsumer

This is a +SystemDaemon+ which implements some more commands in order
to install the queue and register the consumer.

When extending a +PGQConsumer+, you get to implement +$this->config()+
and +$this->process_event()+ functions, and you're done.

The constructor needs a queue name (+qname+), a consumer name
(+cname+) and a database connection string.

The added commands are:

  Create the queue +qname+ and register a consumer +cname+.

  Unregister the consumer +cname+ and drop the queue +qname+.

  True only if the queue +qname+ exists and the consumer +cname+ is registered.

  Create the queue +qname+.

  Drop the queue +qname+.

  Register the consumer +cname+.

  Unregister the consumer +cname+.

  Print out a list of failed events for queue +qname+ and consumer

  Delete given event id, or all failed events if given +all+ as an
  event id.

  Retry given event id, or all failed events if given +all+ as an
  event id.

=== PGQInteractiveConsumer

This class assume the looping will get done elsewhere, at the calling
site for example. It consumes all available events (up until
next_batch() returns +null+).

The lag is not controlled by the implementer class but rather by the
user of it.

Implementer have to call +$this->process()+, which will start
consuming all available events and call the
+$this->process_event(&$event)+ hook for each event.

Internal design note::
  +PGQInteractiveRemoteConsumer+ needs to implement all PGQ methods
  for itself because of PHP limitation of extending from only one base
  class: +PGQConsumer+ could not extends both +SystemDaemon+ and
  +PGQClass+, where we should put the class abstraction over the API

=== PGQRemoteConsumer

+PGQRemoteConsumer+ is a +PGQConsumer+ controlling two PostgreSQL
connections and which will handle +COMMIT+ and +ROLLBACK+ nicely on
both of them.

This means you want to use +PGQRemoteConsumer+ when you're processing
events from one database and apply changes to another one, the remote
one. The +PGQRemoteConsumer+ takes advantage of the fact that the
remote processing is transactionnal (happens on a database) to get
sure any +COMMIT+ ed work on remote connection is associated with
events properly consumed.

Any error in event consuming or remote processing will cause the
current batch processing to be +ROLLBACK+ ed at both points, meaning
the events will get consumed again later.

=== PGQEventRemoteConsumer

When you need to be able to +COMMIT+ or +ROLLBACK+ both transaction at
event level, +PGQEventRemoteConsumer+ is what you're after. It will
use a subtransaction (+SAVEPOINT+) for each event and will be able to
+ROLLBACK TO SAVEPOINT+ on the remote side for any processing error
related to a single event processing.

=== PGQCoopeConsumer

This Consumer will share batches in between all its subconsumer processes.
You need to register each of the subconsumer separately.