Permalink
Browse files

Changes to the publish confirms feature:

 1. Channel  will now  save a  copy of each  outgoing message  when in
 confirm mode,  this could have  RAM usage implications  for processes
 that publish large numbers of messages

 2. When confirm (n)acks are received, the Channel delivers the actual
 message   that  is  being   confirmed  to   the  ChannelEventHandler.
 Previously, the  Channel would deliver  the incoming ack to  the CEH,
 which is pretty useless because  the only information held in the ack
 is the message sequence number (which is internal to the Channel) and
 because the  broker often sends acks with  multiple=true, which means
 the channel delivers the same incoming ack message many times.
  • Loading branch information...
1 parent 4052be7 commit 51a8cfed135d3616046ddc6aba1252f016d65aa5 @BraveSirRobin committed Jan 17, 2012
Showing with 69 additions and 111 deletions.
  1. +1 −4 TODO.dev
  2. +0 −2 build/cpf/amqphp/CallbackExitStrategy.php
  3. +1 −0 build/cpf/amqphp/CallbackExitStrategy.php
  4. +0 −2 build/cpf/amqphp/Channel.php
  5. +1 −0 build/cpf/amqphp/Channel.php
  6. +0 −2 build/cpf/amqphp/ChannelEventHandler.php
  7. +1 −0 build/cpf/amqphp/ChannelEventHandler.php
  8. +0 −2 build/cpf/amqphp/ConditionalExitStrategy.php
  9. +1 −0 build/cpf/amqphp/ConditionalExitStrategy.php
  10. +0 −2 build/cpf/amqphp/Connection.php
  11. +1 −0 build/cpf/amqphp/Connection.php
  12. +0 −2 build/cpf/amqphp/Consumer.php
  13. +1 −0 build/cpf/amqphp/Consumer.php
  14. +0 −2 build/cpf/amqphp/EventLoop.php
  15. +1 −0 build/cpf/amqphp/EventLoop.php
  16. +0 −2 build/cpf/amqphp/ExitStrategy.php
  17. +1 −0 build/cpf/amqphp/ExitStrategy.php
  18. +0 −2 build/cpf/amqphp/Factory.php
  19. +1 −0 build/cpf/amqphp/Factory.php
  20. +0 −2 build/cpf/amqphp/MaxloopExitStrategy.php
  21. +1 −0 build/cpf/amqphp/MaxloopExitStrategy.php
  22. +0 −2 build/cpf/amqphp/SimpleConsumer.php
  23. +1 −0 build/cpf/amqphp/SimpleConsumer.php
  24. +0 −2 build/cpf/amqphp/Socket.php
  25. +1 −0 build/cpf/amqphp/Socket.php
  26. +0 −2 build/cpf/amqphp/StreamSocket.php
  27. +1 −0 build/cpf/amqphp/StreamSocket.php
  28. +0 −2 build/cpf/amqphp/TimeoutExitStrategy.php
  29. +1 −0 build/cpf/amqphp/TimeoutExitStrategy.php
  30. +0 −2 build/cpf/amqphp/persistent/APCPersistenceHelper.php
  31. +1 −0 build/cpf/amqphp/persistent/APCPersistenceHelper.php
  32. +0 −2 build/cpf/amqphp/persistent/FilePersistenceHelper.php
  33. +1 −0 build/cpf/amqphp/persistent/FilePersistenceHelper.php
  34. +0 −2 build/cpf/amqphp/persistent/PChannel.php
  35. +1 −0 build/cpf/amqphp/persistent/PChannel.php
  36. +0 −2 build/cpf/amqphp/persistent/PConnection.php
  37. +1 −0 build/cpf/amqphp/persistent/PConnection.php
  38. +0 −2 build/cpf/amqphp/persistent/PersistenceHelper.php
  39. +1 −0 build/cpf/amqphp/persistent/PersistenceHelper.php
  40. +0 −2 build/cpf/amqphp/protocol/abstrakt/ClassFactory.php
  41. +1 −0 build/cpf/amqphp/protocol/abstrakt/ClassFactory.php
  42. +0 −2 build/cpf/amqphp/protocol/abstrakt/DomainFactory.php
  43. +1 −0 build/cpf/amqphp/protocol/abstrakt/DomainFactory.php
  44. +0 −2 build/cpf/amqphp/protocol/abstrakt/FieldFactory.php
  45. +1 −0 build/cpf/amqphp/protocol/abstrakt/FieldFactory.php
  46. +0 −2 build/cpf/amqphp/protocol/abstrakt/MethodFactory.php
  47. +1 −0 build/cpf/amqphp/protocol/abstrakt/MethodFactory.php
  48. +0 −2 build/cpf/amqphp/protocol/abstrakt/XmlSpecClass.php
  49. +1 −0 build/cpf/amqphp/protocol/abstrakt/XmlSpecClass.php
  50. +0 −2 build/cpf/amqphp/protocol/abstrakt/XmlSpecDomain.php
  51. +1 −0 build/cpf/amqphp/protocol/abstrakt/XmlSpecDomain.php
  52. +0 −2 build/cpf/amqphp/protocol/abstrakt/XmlSpecField.php
  53. +1 −0 build/cpf/amqphp/protocol/abstrakt/XmlSpecField.php
  54. +0 −2 build/cpf/amqphp/protocol/abstrakt/XmlSpecMethod.php
  55. +1 −0 build/cpf/amqphp/protocol/abstrakt/XmlSpecMethod.php
  56. +0 −2 build/cpf/amqphp/wire/Decimal.php
  57. +1 −0 build/cpf/amqphp/wire/Decimal.php
  58. +0 −2 build/cpf/amqphp/wire/Hexdump.php
  59. +1 −0 build/cpf/amqphp/wire/Hexdump.php
  60. +0 −2 build/cpf/amqphp/wire/Method.php
  61. +1 −0 build/cpf/amqphp/wire/Method.php
  62. +0 −2 build/cpf/amqphp/wire/Protocol.php
  63. +1 −0 build/cpf/amqphp/wire/Protocol.php
  64. +0 −2 build/cpf/amqphp/wire/Reader.php
  65. +1 −0 build/cpf/amqphp/wire/Reader.php
  66. +0 −2 build/cpf/amqphp/wire/Table.php
  67. +1 −0 build/cpf/amqphp/wire/Table.php
  68. +0 −2 build/cpf/amqphp/wire/TableField.php
  69. +1 −0 build/cpf/amqphp/wire/TableField.php
  70. +0 −2 build/cpf/amqphp/wire/Writer.php
  71. +1 −0 build/cpf/amqphp/wire/Writer.php
  72. +17 −10 demos/producer.php
  73. +16 −27 src/amqphp/Channel.php
View
@@ -22,7 +22,4 @@
http://www.rabbitmq.com/ha.html
In particular: "As a result of the requeuing, clients that
re-consume from the queue must be aware that they are likely to
- subsequently receive messages that they have seen previously"
-
- * Add support for configurable multiple acks, i.e. automatically
- buffer up to N acks and send as a single nack.
+ subsequently receive messages that they have seen previously"
@@ -1,2 +0,0 @@
-<?php
- namespace amqphp; use amqphp\protocol; use amqphp\wire; class CallbackExitStrategy implements ExitStrategy { private $cb; private $args; function configure ($sMode, $cb=null, $args=null) { if (! is_callable($cb)) { trigger_error("Select mode - invalid callback params", E_USER_WARNING); return false; } else { $this->cb = $cb; $this->args = $args; return true; } } function init (Connection $conn) {} function preSelect ($prev=null) { if ($prev === false) { return false; } if (true !== call_user_func_array($this->cb, $this->args)) { return false; } else { return $prev; } } function complete () {} }
@@ -1,2 +0,0 @@
-<?php
- namespace amqphp; use amqphp\protocol; use amqphp\wire; class Channel { protected $myConn; protected $chanId; protected $flow = true; private $destroyed = false; protected $frameMax; protected $isOpen = false; protected $consumers = array(); protected $callbackHandler; protected $confirmSeqs = array(); protected $confirmSeq = 0; protected $confirmMode = false; public $ackBuffer = 5; protected $pendingAcks = array(); protected $numPendAcks = 0; protected $ackFlag; function setEventHandler (ChannelEventHandler $evh) { $this->callbackHandler = $evh; } function hasOutstandingConfirms () { return (bool) $this->confirmSeqs; } function setConfirmMode () { if ($this->confirmMode) { return; } $confSelect = $this->confirm('select'); $confSelectOk = $this->invoke($confSelect); if (! ($confSelectOk instanceof wire\Method) || $confSelectOk->amqpClass != 'confirm.select-ok') { throw new \Exception("Failed to set confirm mode", 8674); } $this->confirmMode = true; } function setConnection (Connection $rConn) { $this->myConn = $rConn; } function setChanId ($chanId) { $this->chanId = $chanId; } function getChanId () { return $this->chanId; } function setFrameMax ($frameMax) { $this->frameMax = $frameMax; } function initChannel () { $pl = $this->myConn->getProtocolLoader(); $meth = new wire\Method($pl('ClassFactory', 'GetMethod', array('channel', 'open')), $this->chanId); $meth->setField('reserved-1', ''); $resp = $this->myConn->invoke($meth); } function __call ($class, $args) { if ($this->destroyed) { throw new \Exception("Attempting to use a destroyed channel", 8766); } $m = $this->myConn->constructMethod($class, $args); $m->setWireChannel($this->chanId); $m->setMaxFrameSize($this->frameMax); return $m; } function invoke (wire\Method $m) { if ($this->destroyed) { throw new \Exception("Attempting to use a destroyed channel", 8767); } else if (! $this->flow) { trigger_error("Channel is closed", E_USER_WARNING); return; } else if (is_null($tmp = $m->getWireChannel())) { $m->setWireChannel($this->chanId); } else if ($tmp != $this->chanId) { throw new \Exception("Method is invoked through the wrong channel", 7645); } if ($this->confirmMode && $m->amqpClass == 'basic.publish') { $this->confirmSeq++; $this->confirmSeqs[] = $this->confirmSeq; } return $this->myConn->invoke($m); } function handleChannelMessage (wire\Method $meth) { switch ($meth->amqpClass) { case 'channel.flow': $this->flow = ! $this->flow; if ($r = $meth->getMethodProto()->getResponses()) { $meth = new wire\Method($r[0], $this->chanId); $this->invoke($meth); } return false; case 'channel.close': $pl = $this->myConn->getProtocolLoader(); if ($culprit = $pl('ClassFactory', 'GetMethod', array($meth->getField('class-id'), $meth->getField('method-id')))) { $culprit = $culprit->getSpecName(); } else { $culprit = '(Unknown or unspecified)'; } $errCode = $pl('ProtoConsts', 'Konstant', array($meth->getField('reply-code'))); $eb = ''; foreach ($meth->getFields() as $k => $v) { $eb .= sprintf("(%s=%s) ", $k, $v); } $tmp = $meth->getMethodProto()->getResponses(); $closeOk = new wire\Method($tmp[0], $this->chanId); $em = "[channel.close] reply-code={$errCode['name']} triggered by $culprit: $eb"; try { $this->myConn->invoke($closeOk); $em .= " Channel closed OK"; $n = 3687; } catch (\Exception $e) { $em .= " Additionally, channel closure ack send failed"; $n = 2435; } throw new \Exception($em, $n); case 'channel.close-ok': case 'channel.open-ok': case 'channel.flow-ok': return true; default: throw new \Exception("Received unexpected channel message: {$meth->amqpClass}", 8795); } } function handleChannelDelivery (wire\Method $meth) { switch ($meth->amqpClass) { case 'basic.deliver': return $this->deliverConsumerMessage($meth); case 'basic.return': if ($this->callbackHandler) { $this->callbackHandler->publishReturn($meth); } return false; case 'basic.ack': $this->removeConfirmSeqs($meth, 'publishConfirm'); return false; case 'basic.nack': $this->removeConfirmSeqs($meth, 'publishNack'); return false; case 'basic.cancel': $this->handleConsumerCancel($meth); break; default: throw new \Exception("Received unexpected channel delivery:\n{$meth->amqpClass}", 87998); } } private function handleConsumerCancel ($meth) { $ctag = $meth->getField('consumer-tag'); list($cons, $status,) = $this->getConsumerAndStatus($ctag); if ($cons && $status == 'READY') { $cons->handleCancel($meth, $this); $this->setConsumerStatus($ctag, 'CLOSED') OR trigger_error("Failed to set consumer status flag (2)", E_USER_WARNING); if (! $meth->getField('no-wait')) { $this->invoke($this->basic('cancel-ok', array('consumer-tag', $ctag))); } } else if ($cons) { $m = sprintf("Cancellation message delivered to closed consumer %s", $ctag); trigger_error($m, E_USER_WARNING); } else { $m = sprintf("Unable to load consumer for consumer cancellation %s", $ctag); trigger_error($m, E_USER_WARNING); } } private function deliverConsumerMessage ($meth) { $ctag = $meth->getField('consumer-tag'); $response = false; list($cons, $status, $consParams) = $this->getConsumerAndStatus($ctag); if ($cons && $status == 'READY') { $response = $cons->handleDelivery($meth, $this); } else if ($cons) { $m = sprintf("Message delivered to closed consumer %s in non-ready state %s -- reject %s", $ctag, $status, $meth->getField('delivery-tag')); trigger_error($m, E_USER_WARNING); $response = CONSUMER_REJECT; } else { $m = sprintf("Unable to load consumer for delivery %s -- reject %s", $ctag, $meth->getField('delivery-tag')); trigger_error($m, E_USER_WARNING); $response = CONSUMER_REJECT; } if (! $response) { return false; } if (! is_array($response)) { $response = array($response); } foreach ($response as $resp) { switch ($resp) { case CONSUMER_ACK: if (! array_key_exists('no-ack', $consParams) || ! $consParams['no-ack']) { $this->ack($meth, CONSUMER_ACK); } break; case CONSUMER_DROP: case CONSUMER_REJECT: if (! array_key_exists('no-ack', $consParams) || ! $consParams['no-ack']) { $this->ack($meth, $resp); } break; case CONSUMER_CANCEL: $this->removeConsumerByTag($cons, $ctag); break; } } return false; } private function ack ($meth, $action) { if (is_null($this->ackFlag)) { $this->ackFlag = $action; } else if ($action != $this->ackFlag) { $this->flushAcks(); $this->ackFlag = $action; } $this->pendingAcks[] = $meth->getField('delivery-tag'); $this->numPendAcks++; if ($this->numPendAcks >= $this->ackBuffer) { $this->flushAcks(); } } private function flushAcks () { if (is_null($this->ackFlag)) { return; } switch ($this->ackFlag) { case CONSUMER_ACK: $ack = $this->basic('ack', array('delivery-tag' => array_pop($this->pendingAcks), 'multiple' => true)); $this->invoke($ack); break; case CONSUMER_REJECT: case CONSUMER_DROP: $rej = $this->basic('nack', array('delivery-tag' => array_pop($this->pendingAcks), 'multiple' => true, 'requeue' => ($this->ackFlag == CONSUMER_REJECT))); $this->invoke($rej); break; default: throw new \Exception("Internal (n)ack tracking state error", 2956); } $this->ackFlag = null; $this->numPendAcks = 0; $this->pendingAcks = array(); } private function removeConfirmSeqs (wire\Method $meth, $event) { if ($meth->getField('multiple')) { $dtag = $meth->getField('delivery-tag'); $evh = $this->callbackHandler; $this->confirmSeqs = array_filter($this->confirmSeqs, function ($id) use ($dtag, $evh, $event, $meth) { if ($id <= $dtag) { if ($evh) { $evh->$event($meth); } return false; } else { return true; } }); } else { $dt = $meth->getField('delivery-tag'); if (isset($this->confirmSeqs)) { if ($this->callbackHandler) { $this->callbackHandler->$event($meth); } unset($this->confirmSeqs[array_search($dt, $this->confirmSeqs)]); } } } function shutdown () { if (! $this->invoke($this->channel('close', array('reply-code' => '', 'reply-text' => '')))) { trigger_error("Unclean channel shutdown", E_USER_WARNING); } $this->myConn->removeChannel($this); $this->destroyed = true; $this->myConn = $this->chanId = $this->ticket = null; } function addConsumer (Consumer $cons, array $cParams=null) { $this->consumers[] = array($cons, false, 'READY_WAIT', $cParams); } function canListen () { return $this->hasListeningConsumers() || $this->hasOutstandingConfirms(); } function removeConsumer (Consumer $cons) { foreach ($this->consumers as $c) { if ($c[0] === $cons) { if ($c[2] == 'READY') { $this->removeConsumerByTag($c[0], $c[1]); } return; } } trigger_error("Consumer does not belong to this Channel", E_USER_WARNING); } function removeAllConsumers () { foreach ($this->consumers as $c) { if ($c[2] == 'READY') { $this->removeConsumerByTag($c[0], $c[1]); } } } private function removeConsumerByTag (Consumer $cons, $ctag) { list(, $cstate,) = $this->getConsumerAndStatus($ctag); if ($cstate == 'CLOSED') { trigger_error("Consumer is already removed", E_USER_WARNING); return; } $cnl = $this->basic('cancel', array('consumer-tag' => $ctag, 'no-wait' => false)); $cOk = $this->invoke($cnl); if ($cOk->amqpClass == 'basic.cancel-ok') { $this->setConsumerStatus($ctag, 'CLOSED') OR trigger_error("Failed to set consumer status flag", E_USER_WARNING); } else { throw new \Exception("Failed to cancel consumer - bad broker response", 9768); } $cons->handleCancelOk($cOk, $this); } private function setConsumerStatus ($tag, $status) { foreach ($this->consumers as $k => $c) { if ($c[1] === $tag) { $this->consumers[$k][2] = $status; return true; } } return false; } private function getConsumerAndStatus ($tag) { foreach ($this->consumers as $c) { if ($c[1] == $tag) { return array($c[0], $c[2], $c[3]); } } return array(null, 'INVALID', null); } function hasListeningConsumers () { foreach ($this->consumers as $c) { if ($c[2] === 'READY') { return true; } } return false; } function getConsumerByTag ($t) { foreach ($this->consumers as $c) { if ($c[2] == 'READY' && $c[1] === $t) { return $c[0]; } } } function getConsumerTags () { $tags = array(); foreach ($this->consumers as $c) { if ($c[2] == 'READY') { $tags[] = $c[1]; } } return $tags; } function startAllConsumers () { if (! $this->consumers) { return false; } $r = false; foreach (array_keys($this->consumers) as $cnum) { if (false === $this->consumers[$cnum][1]) { $this->_startConsumer($cnum); $r = true; } } return $r; } private function _startConsumer ($cnum) { $consume = false; if (($consume = $this->consumers[$cnum][0]->getConsumeMethod($this)) && ! ($consume instanceof wire\Method)) { trigger_error("Consumer returned invalid consume method", E_USER_WARNING); $consume = false; } if (! $consume && is_array($this->consumers[$cnum][3])) { $consume = $this->basic('consume', $this->consumers[$cnum][3]); } if (! $consume) { throw new \Exception("Couldn't find any consume paramters while starting consumer", 9265); } $cOk = $this->invoke($consume); $this->consumers[$cnum][0]->handleConsumeOk($cOk, $this); $this->consumers[$cnum][2] = 'READY'; $this->consumers[$cnum][1] = $cOk->getField('consumer-tag'); $this->consumers[$cnum][3] = $consume->getFields(); } function startConsumer (Consumer $cons) { foreach ($this->consumers as $i => $c) { if ($c[0] === $cons && $c[1] === false) { $this->_startConsumer($i); return true; } } return false; } function onSelectEnd () { $this->flushAcks(); $this->consuming = false; } function isSuspended () { return ! $this->flow; } function toggleFlow () { $flow = ! $this->flow; $this->flow = true; $meth = $this->channel('flow', array('active' => $flow)); $fr = $this->invoke($meth); $newFlow = $fr->getField('active'); if ($newFlow != $flow) { trigger_error(sprintf("Flow Unexpected channel flow response, expected %d, got %d", ! $this->flow, $this->flow), E_USER_WARNING); } $this->flow = $newFlow; } }
@@ -1,2 +0,0 @@
-<?php
- namespace amqphp; use amqphp\protocol; use amqphp\wire; interface ChannelEventHandler { function publishConfirm (wire\Method $meth); function publishReturn (wire\Method $meth); function publishNack(wire\Method $meth); }
@@ -1,2 +0,0 @@
-<?php
- namespace amqphp; use amqphp\protocol; use amqphp\wire; class ConditionalExitStrategy implements ExitStrategy { private $conn; function configure ($sMode) {} function init (Connection $conn) { $this->conn = $conn; } function preSelect ($prev=null) { if ($prev === false) { return false; } $hasConsumers = false; foreach ($this->conn->getChannels() as $chan) { if ($chan->canListen()) { $hasConsumers = true; break; } } if (! $hasConsumers) { return false; } else { return $prev; } } function complete () { $this->conn = null; } }
Oops, something went wrong.

0 comments on commit 51a8cfe

Please sign in to comment.