Skip to content

Commit

Permalink
Add support for Cooperative Consumers (PGQ3)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitri committed Feb 8, 2012
1 parent c1a9248 commit b842739
Showing 1 changed file with 111 additions and 0 deletions.
111 changes: 111 additions & 0 deletions PGQCoopConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
<?php
require_once("pgq/PGQRemoteConsumer.php");

/**
* PGQEventRemoteConsumer is a PGQRemoteConsumer which handles nested
* transactions for event management, allowing the remote processing
* to be commited or rollbacked at event level.
*/

abstract class PGQEventRemoteConsumer extends PGQConsumer
{
protected $sname; // subconsumer name
protected $timeout = NULL;

public function __construct($sname, $cname, $qname, $argc, $argv, $src_constr)
{
$this->sname = $sname;
parent::__construct($argc, $argv);
}

protected function register_subconsumer() {
$sql = sprintf("SELECT pgq_coop.register_subconsumer('%s', '%s', '%s');",
pg_escape_string($this->qname),
pg_escape_string($this->cname),
pg_escape_string($this->sname));

$this->log->verbose("%s", $sql);
$r = pg_query($pgcon, $sql);
if( $r === False ) {
$this->log->warning(
"Could not register subconsumer '%s' of '%s' to queue '%s'",
$this->sname, $this->cname, $this->qname);
return False;
}

$registered = pg_fetch_result($r, 0, 0);
if( $registered == "1" ) {
return True;
}
else {
$this->log->fatal("Register SubConsumer failed (%d).", $registered);
return False;
}
}

/**
* Unregister PGQ Consumer. Called from stop().
*/
public static function unregister_sbconsumer() {
$sql = sprintf("SELECT pgq_coop.unregister_subconsumer('%s', '%s', '%s', 0);",
pg_escape_string($qname),
pg_escape_string($cname),
pg_escape_string($sname));

$this->log->verbose("%s", $sql);
$r = pg_query($pgcon, $sql);
if( $r === False ) {
$this->log->fatal("Could not unregister subconsumer '%s' of '%s' to queue '%s'",
$this->sname, $this->cname, $this->qname);
return False;
}

$unregistered = pg_fetch_result($r, 0, 0);
if( $unregistered == "1" ) {
return True;
}
else {
$this->log->fatal("Unregister SubConsumer failed (%d).", $unregistered);
return False;
}
}

protected function next_batch() {
if( $this->timeout !== NULL )
$sql = sprintf("SELECT pgq_coop.next_batch('%s', '%s', '%s', '%s')",
pg_escape_string($this->qname),
pg_escape_string($this->cname),
pg_escape_string($this->sname),
$this->timeout);
else
$sql = sprintf("SELECT pgq_coop.next_batch('%s', '%s', '%s')",
pg_escape_string($this->qname),
pg_escape_string($this->cname),
pg_escape_string($this->sname));

$this->log->verbose("%s", $sql);
if( ($r = pg_query($this->pg_src_con, $sql)) === False ) {
$this->log->error("Could not get next batch");
return False;
}

$batch_id = pg_fetch_result($r, 0, 0);
$this->log->debug("Get batch_id %s (isnull=%s)",
$batch_id,
($batch_id === null ? "True" : "False"));
return $batch_id;
}

protected function finish_batch($batch_id) {
$sql = sprintf("SELECT pgq_coop.finish_batch(%d);", (int)$batch_id);

$this->log->verbose("%s", $sql);
if( pg_query($this->pg_src_con, $sql) === False ) {
$this->log->error("Could not finish batch %d", (int)$batch_id);
return False;
}
return True;
}
}

?>

0 comments on commit b842739

Please sign in to comment.