Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

More work towards a stable persistent connection implementation.

The demos  folder "Setup" is  replaced by a "Factory"  component which
lives in  the main library.  Some  of the demos are  refactored to use
the Factory class.
Improved the channel flow API level support in the main Channel class,
including new methods to toggle flow.
Tweaks in the StreamSocket implementation to expose more of the
underlying connection metadata through the API.
Added  a   "suspend  flow"  feature   to  PChannel,  this   means  the
implementation can  automatically set  the channel to  flow=yes during
rehydration and flow=no during  serialize.  Most of the implementation
of this is in PConnection and Channel classes.
  • Loading branch information...
commit 4719ca644ff746c38dd85242fd8e0474626a4d9f 1 parent 9fa7d19
@BraveSirRobin authored
View
4 build.xml
@@ -80,8 +80,8 @@ namespace ${namespace};</echo>
<regexp pattern="^[\s]*namespace.*$" replace="" modifiers="m"/>
</replaceregexp>
- <!--<stripphpcomments />
- <stripwhitespace />-->
+ <stripphpcomments />
+ <stripwhitespace />
<replaceregexp>
<regexp pattern="&lt;\?php" replace=""/>
View
26 demos/Setup.php
@@ -13,8 +13,6 @@
*/
class Setup
{
-
-
/**
* Factory method - create and return a set of Connections
* corresponding to the given XML. The given XML can contain
@@ -70,15 +68,10 @@ function getSetup ($xml) {
// Execute whatever methods are supplied.
- foreach ($conn->methods->method as $iMeth) {
- $a = $this->xmlToArray($iMeth);
- $c = $a['class'];
- $m = $a['method'];
- $meth = $_chan->$c($m, $a['args']);
- $_chan->invoke($meth);
+ if (count($conn->methods) > 0) {
+ $this->methods($_chan, $conn->methods->method);
}
-
// Finally, set up consumers. This is done last in case queues / exchanges etc. need to be set up before the consumers.
$i = 0;
foreach ($conn->channel as $chan) {
@@ -99,6 +92,21 @@ function getSetup ($xml) {
}
+
+ /**
+ * Execute the methods defined in $meths against channel $chan
+ */
+ function methods (amqp\Channel $chan, SimpleXmlElement $meths) {
+ // Execute whatever methods are supplied.
+ foreach ($meths as $iMeth) {
+ $a = $this->xmlToArray($iMeth);
+ $c = $a['class'];
+ $m = $a['method'];
+ $r = $chan->invoke($chan->$c($m, $a['args']));
+ }
+ }
+
+
/**
* Perform the given cast on the given value, defaults to a string
* cast.
View
18 demos/demo-get-command.php
@@ -18,23 +18,23 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/**
- * This file demonstrated the use of basic.get, a much simpler way to read messages
- * from a Broker then writing a full consumer. Note that the exchange.declare, queue.declare,
- * queue.bind Amqp commands are needed only to create those broker object so as to prevent
- * this script from generating errors. Declaring these broker objects when they already
- * exist is fine, provided *this* declaration doesn't clash with the object that already
- * exists on the broker.
+ * This file demonstrates the use of basic.get, a much simpler way to
+ * read messages from a Broker then writing a full consumer
*/
use amqphp as amqp;
use amqphp\protocol;
use amqphp\wire;
require __DIR__ . '/demo-loader.php';
-require __DIR__ . '/Setup.php';
-$su = new Setup;
-$cons = $su->getSetup(__DIR__ . '/multi-producer.xml');
+$su = new amqp\Factory(__DIR__ . '/multi-producer.xml');
+$cons = array();
+foreach ($su->run() as $res) {
+ if ($res instanceof amqp\Connection) {
+ $cons[] = $res;
+ }
+}
$conn = reset($cons);
$chans = $conn->getChannels();
View
9 demos/demo-multi-producer.php
@@ -5,7 +5,6 @@
use amqphp\wire;
require __DIR__ . '/demo-loader.php';
-require __DIR__ . '/Setup.php';
@@ -18,8 +17,12 @@
'exchange' => 'most-basic-ex'); // Must match exchange in multi-producer.xml
-$su = new Setup;
-$conns = $su->getSetup(__DIR__ . '/multi-producer.xml');
+$su = new amqp\Factory(__DIR__ . '/multi-producer.xml');
+$conns = array();
+foreach ($su->run() as $res) {
+
+}
+
$cons = array();
foreach ($conns as $con) {
$chans = $con->getChannels();
View
2  demos/web-common.php
@@ -79,6 +79,8 @@ function getConsumeMethod (amqp\Channel $chan) {
function handleDelivery (wire\Method $meth, amqp\Channel $chan) {
if ($this->nf) {
call_user_func($this->nf, $meth, $chan, $this);
+ } else {
+ error_log("!!No handler for message delivery:\n%s", $meth->getContent());
}
return amqp\CONSUMER_ACK;
}
View
8 demos/web-multi-process-test.php
@@ -42,7 +42,6 @@
}
require LIBDIR . DIRECTORY_SEPARATOR . 'demo-loader.php';
require LIBDIR . DIRECTORY_SEPARATOR . 'web-common.php';
-require LIBDIR . DIRECTORY_SEPARATOR . 'Setup.php';
@@ -57,14 +56,15 @@ class MultiProcessPCTest
// WARNING - If you restart your web server you must delete this file!!
// Persistence format: array(<procid> => <created timestamp>);
public $connStore = '/tmp/MultiProcessPCTest-runtime.txt';
- public $connections;
+ public $connections = array();
public $allProcs = array();
public $connConfig;
public $view;
function start ($config) {
- $s = new Setup;
- $this->connections = $s->getSetup($config);
+ $fact = new amqp\Factory($config);
+ $this->connections = $fact->getConnections();
+
// Read in the server process tracker data and make sure this process is in it.
if ($buff = @file_get_contents($this->connStore)) {
$this->allProcs = unserialize($buff);
View
45,110 diag/amqp.all.dia
23,132 additions, 21,978 deletions not shown
View
BIN  diag/amqp.all.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
18 src/amqphp/Channel.php
@@ -189,7 +189,6 @@ function handleChannelMessage (wire\Method $meth) {
return false;
case 'channel.close':
$pl = $this->myConn->getProtocolLoader();
- //if ($culprit = protocol\ClassFactory::GetMethod($meth->getField('class-id'), $meth->getField('method-id'))) {
if ($culprit = $pl('ClassFactory', 'GetMethod', array($meth->getField('class-id'), $meth->getField('method-id')))) {
$culprit = "{$culprit->getSpecClass()}.{$culprit->getSpecName()}";
} else {
@@ -216,6 +215,7 @@ function handleChannelMessage (wire\Method $meth) {
throw new \Exception($em, $n);
case 'channel.close-ok':
case 'channel.open-ok':
+ case 'channel.flow-ok':
return true;
default:
throw new \Exception("Received unexpected channel message: $sid", 8795);
@@ -510,4 +510,20 @@ function startConsumer (Consumer $cons) {
function onSelectEnd () {
$this->consuming = false;
}
+
+ function isSuspended () {
+ return ! $this->flow;
+ }
+
+ function toggleFlow () {
+ $flow = ! $this->flow;
+ $this->flow = true; // otherwise the message won't send
+ $meth = $this->channel('flow', array('active' => $flow));
+ $fr = $this->invoke($meth);
+ $newFlow = $fr->getField('active');
+ if ($newFlow != $flow) {
+ trigger_error(sprintf("Flow Unexpected channel flow response, expected %d, got %d", ! $this->flow, $this->flow), E_USER_WARNING);
+ }
+ $this->flow = $newFlow;
+ }
}
View
2  src/amqphp/Connection.php
@@ -656,8 +656,6 @@ function invoke (wire\Method $inMeth, $noWait=false) {
}
-
-
/**
* Convert the given raw wire content in to Method objects.
* Connection and channel messages are delivered immediately and
View
250 src/amqphp/Factory.php
@@ -0,0 +1,250 @@
+<?php
+/**
+ *
+ * Copyright (C) 2010, 2011 Robin Harvey (harvey.robin@gmail.com)
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+namespace amqphp;
+
+use amqphp\protocol;
+use amqphp\wire;
+use \amqphp\persistent as pers;
+
+
+/**
+ * This is a helper class which can create Amqp connections and
+ * channels (persistent and non-persistent), add consumers, channel
+ * event handlers, and call amqp methods. All of the broker and
+ * client configuration can be stored in an XML file, and their
+ * corresponding setup be performed by a single method call.
+ *
+ * Note that the use of this component is not required, you can still
+ * set up your connections / channels and broker configuration calls
+ * manually, if desired.
+ */
+class Factory
+{
+ /* Constructor flag - load XML from the given file */
+ const XML_FILE = 1;
+
+ /* Constructor flag - load XML from the given string */
+ const XML_STRING = 2;
+
+ /* SimpleXML object, with content x-included */
+ private $simp;
+
+ /* Name of the configuration root element */
+ private $rootEl;
+
+ function __construct ($xml, $documentURI=false, $flag=self::XML_FILE) {
+ $d = new \DOMDocument;
+
+ switch ($flag) {
+ case self::XML_FILE:
+ if (! $d->load($xml)) {
+ throw new \Exception("Failed to load factory XML", 92656);
+ }
+ break;
+ case self::XML_STRING:
+ if (! $d->loadXML($xml)) {
+ throw new \Exception("Failed to load factory XML", 92656);
+ }
+ if ($documentURI) {
+ $d->documentURI = $documentURI;
+ }
+ break;
+ default:
+ throw new \Exception("Invalid construct flag", 95637);
+ }
+
+ if (-1 === $d->xinclude()) {
+ throw new \Exception("Failed to load factory XML", 92657);
+ } else if (! ($this->simp = simplexml_import_dom($d))) {
+ throw new \Exception("Failed to load factory XML", 92658);
+ }
+
+ switch ($tmp = strtolower((string) $this->simp->getName())) {
+ case 'setup':
+ case 'methods':
+ $this->rootEl = $tmp;
+ break;
+ default:
+ throw new \Exception("Unexpected Factory configuration data root element", 17893);
+ }
+ }
+
+
+ /**
+ * Run the XML instructions and return the corresponding objects /
+ * responses.
+ */
+ function run (Channel $chan=null) {
+ switch ($this->rootEl) {
+ case 'setup':
+ return $this->runSetupSequence();
+ case 'methods':
+ if (is_null($chan)) {
+ throw new \Exception("", 15758);
+ }
+ return $this->runMethodSequence($chan, $this->simp);
+ }
+ }
+
+
+ /**
+ * Helper method - run the config and return only connections
+ * (throw away method responses)
+ */
+ function getConnections (Channel $chan=null) {
+ $r = array();
+ foreach ($this->run() as $res) {
+ if ($res instanceof Connection) {
+ $r[] = $res;
+ }
+ }
+ return $r;
+ }
+
+
+
+ /**
+ * Run the connection setup sequence
+ */
+ private function runSetupSequence () {
+ $ret = array();
+
+ foreach ($this->simp->connection as $conn) {
+ $_chans = array();
+
+ // Create connection and connect
+ $impl = (string) $conn->impl;
+ $_conn = new $impl($this->xmlToArray($conn->server->children()));
+ if (isset($conn->persistence)) {
+ $_conn->setPersistenceHelperImpl((string) $conn->persistence);
+ }
+ $_conn->connect();
+
+ if ($_conn instanceof pers\PConnection && $_conn->getPersistenceStatus() == pers\PConnection::SOCK_REUSED) {
+ // Assume that the setup is complete for existing PConnection
+ // ??TODO?? Run method sequence here too?
+ $ret[] = $_conn;
+ continue;
+ }
+
+
+ // Create channels and channel event handlers.
+ foreach ($conn->channel as $chan) {
+ $_chan = $_conn->openChannel();
+ if (isset($chan->event_handler)) {
+ $impl = (string) $chan->event_handler->impl;
+ $_chan->setEventHandler(new $impl);
+ }
+ $_chans[] = $_chan;
+ }
+ if (! $_chans) {
+ throw new \Exception("You must define at least one channel", 92416);
+ }
+ $_chan = reset($_chans);
+
+
+ // Execute whatever methods are supplied.
+ if (count($conn->methods) > 0) {
+ $ret[] = $this->runMethodSequence($_chan, $conn->methods->method);
+ }
+
+ // Finally, set up consumers. This is done last in case queues / exchanges etc. need to be set up before the consumers.
+ $i = 0;
+ foreach ($conn->channel as $chan) {
+ $_chan = $_chans[$i++];
+ foreach ($chan->consumer as $cons) {
+ $impl = (string) $cons->impl;
+ $_chan->addConsumer($_cons = new $impl($this->xmlToArray($cons->args->children())));
+ if (isset($cons->autostart) && $this->kast($cons->autostart, 'boolean')) {
+ $_chan->startConsumer($_cons);
+ }
+ }
+ }
+
+
+ $ret[] = $_conn;
+ }
+ return $ret;
+ }
+
+
+
+ /**
+ * Execute the methods defined in $meths against channel $chan,
+ * return the results.
+ */
+ private function runMethodSequence (Channel $chan, \SimpleXmlElement $meths) {
+ $r = array();
+ // Execute whatever methods are supplied.
+ foreach ($meths as $iMeth) {
+ $a = $this->xmlToArray($iMeth);
+ $c = $a['class'];
+ $r[] = $chan->invoke($chan->$c($a['method'], $a['args']));
+ }
+ return $r;
+ }
+
+
+ /**
+ * Perform the given cast on the given value, defaults to a string
+ * cast.
+ */
+ private function kast ($val, $cast) {
+ switch ($cast) {
+ case 'string':
+ return (string) $val;
+ case 'boolean':
+ $val = trim((string) $val);
+ if ($val === '0' || strtolower($val) === 'false') {
+ return false;
+ } else if ($val === '1' || strtolower($val) === 'true') {
+ return true;
+ } else {
+ trigger_error("Bad boolean cast $val - use 0/1 true/false", E_USER_WARNING);
+ return true;
+ }
+ case 'int':
+ return (int) $val;
+ default:
+ trigger_error("Unknown Kast $cast", E_USER_WARNING);
+ return (string) $val;
+ }
+ }
+
+
+ /**
+ * Recursively convert an XML structure to nested assoc arrays.
+ * For each "leaf", use the "cast" given in the @k attribute.
+ */
+ private function xmlToArray (\SimpleXmlElement $e) {
+ $ret = array();
+ foreach ($e as $c) {
+ $ret[(string) $c->getName()] = (count($c) == 0)
+ ? $this->kast($c, (string) $c['k'])
+ : $this->xmlToArray($c);
+ }
+ return $ret;
+ }
+
+
+
+
+}
View
25 src/amqphp/StreamSocket.php
@@ -47,8 +47,8 @@ class StreamSocket
private $connected;
private $interrupt = false;
private $flags;
- private $isReusedPSock = false;
private $vhost;
+ private $stfp;
function __construct ($params, $flags, $vhost) {
$this->url = $params['url'];
@@ -91,10 +91,11 @@ function connect () {
ini_get("default_socket_timeout"),
$flags, $context);
+ $this->stfp = ftell($this->sock);
+
if (! $this->sock) {
throw new \Exception("Failed to connect stream socket {$this->url}, ($errno, $errstr): flags $flags", 7568);
- } else if (($flags & STREAM_CLIENT_PERSISTENT) && ftell($this->sock) > 0) {
- $this->isReusedPSock = true;
+ } else if (($flags & STREAM_CLIENT_PERSISTENT) && $this->stfp > 0) {
foreach (self::$All as $sock) {
if ($sock !== $this && $sock->getCK() == $this->getCK()) {
/* TODO: Investigate whether mixing persistent and
@@ -115,7 +116,23 @@ function connect () {
* if it's a persistent socket which has been re-used.
*/
function isReusedPSock () {
- return $this->isReusedPSock;
+ return ($this->stfp > 0);
+ }
+
+ /**
+ * Return the ftell() value that was recorded immediately after
+ * the underlying connection was opened.
+ */
+ function getConnectionStartFP () {
+ return $this->stfp;
+ }
+
+
+ /**
+ * Call ftell on the underlying stream and return the result
+ */
+ function tell () {
+ return ftell($this->sock);
}
View
17 src/amqphp/persistent/PChannel.php
@@ -23,12 +23,25 @@
/**
* Simple persistence extension for the standard Channel. Serialise
- * is invoked by the containing Channel
+ * is invoked by the containing Channel.
+ *
+ * TODO : Implement a channel suspend feature (optional?) using
+ * channel.flow. Must be called in both serialise methods.
*/
class PChannel extends \amqphp\Channel implements \Serializable
{
- private static $PersProps = array('chanId', 'flow', 'frameMax', 'confirmSeqs', 'confirmSeq', 'confirmMode', 'isOpen', 'callbackHandler');
+ /**
+ * Flag - when set, the serialize methods will use the amqp
+ * channel.flow to suspend and resume message delivery.
+ * TODO : Set default back to false!!!!
+ * TODO : Replace with 2 flags: suspendDuringSleep and unsuspendDuringWakeup
+ */
+ public $suspendFlow = false;
+
+ private static $PersProps = array('chanId', 'flow', 'frameMax', 'confirmSeqs',
+ 'confirmSeq', 'confirmMode', 'isOpen',
+ 'callbackHandler', 'suspendFlow');
function serialize () {
$data = array();
View
43 src/amqphp/persistent/PConnection.php
@@ -69,7 +69,10 @@ class PConnection extends \amqphp\Connection implements \Serializable
/**
* List of Connection (super class) properties to be persisted.
*/
- private static $BasicProps = array('capabilities', 'socketImpl', 'protoImpl', 'socketParams', 'vhost', 'frameMax', 'chanMax', 'signalDispatch', 'nextChan', 'unDelivered', 'unDeliverable', 'incompleteMethods', 'readSrc');
+ private static $BasicProps = array('capabilities', 'socketImpl', 'protoImpl', 'socketParams',
+ 'vhost', 'frameMax', 'chanMax', 'signalDispatch',
+ 'nextChan', 'unDelivered', 'unDeliverable', 'incompleteMethods',
+ 'readSrc');
private $sleepMode = self::PERSIST_CHANNELS;
@@ -253,23 +256,34 @@ function sleep () {
*/
function serialize () {
$data = array();
- foreach (self::$BasicProps as $k) {
- $data[$k] = $this->$k;
- }
-
$z = array();
$z[0] = $this->sleepMode;
- $z[1] = $data;
if ($this->sleepMode == self::PERSIST_CHANNELS) {
$z[2] = $this->chans;
+ foreach ($this->chans as $chan) {
+ if ($chan->suspendFlow && ! $chan->isSuspended()) {
+ $chan->toggleFlow();
+ }
+ }
+ }
+
+ foreach (self::$BasicProps as $k) {
+ if (in_array($k, array('readSrc', 'incompleteMethods', 'unDelivered', 'unDeliverable'))
+ && $this->$k) {
+ trigger_error("PConnection will persist application data ({$k})", E_USER_WARNING);
+ }
+ $data[$k] = $this->$k;
}
+ $z[1] = $data;
+
+
$this->stateFlag |= self::ST_SER;
- return serialize($z);
+ $r = serialize($z);
+ return $r;
}
-
/**
* Can be called manually or from unserialize(), in the latter
* case the underlying connection is re-established.
@@ -294,10 +308,11 @@ function unserialize ($serialised) {
$this->$k = $data[1][$k];
}
- // Reconnect only if we're being unserialised
+ // Reconnect only if we're being unserialised manually
if ($rewake) {
$this->initSocket();
$this->sock->connect();
+
if (! $this->sock->isReusedPSock()) {
throw new \Exception("Persisted connection woken up with a fresh socket connection", 9249);
}
@@ -318,10 +333,16 @@ function unserialize ($serialised) {
$chan->setConnection($this);
}
}
- $this->stateFlag |= self::ST_UNSER;
- }
+ $this->stateFlag |= self::ST_UNSER;
+ // Restart flow, if required.
+ foreach ($this->chans as $chan) {
+ if ($chan->suspendFlow && $chan->isSuspended()) {
+ $chan->toggleFlow();
+ }
+ }
+ }
/**
View
3,825 tools/amqp-xml-spec-0.9.1.xml
3,320 additions, 505 deletions not shown
Please sign in to comment.
Something went wrong with that request. Please try again.