/
Channel.php
2 lines (2 loc) · 11.6 KB
/
Channel.php
1
2
<?php
namespace amqphp; use amqphp\protocol; use amqphp\wire; class Channel { protected $myConn; protected $chanId; protected $flow = true; private $destroyed = false; protected $frameMax; protected $isOpen = false; protected $consumers = array(); protected $callbackHandler; protected $confirmSeqs = array(); protected $confirmSeq = 0; protected $confirmMode = false; public $ackBuffer = 1; protected $ackHead; protected $numPendAcks = 0; protected $ackFlag; function setEventHandler (ChannelEventHandler $evh) { $this->callbackHandler = $evh; } function hasOutstandingConfirms () { return (bool) $this->confirmSeqs; } function setConfirmMode () { if ($this->confirmMode) { return; } $confSelect = $this->confirm('select'); $confSelectOk = $this->invoke($confSelect); if (! ($confSelectOk instanceof wire\Method) || $confSelectOk->amqpClass != 'confirm.select-ok') { throw new \Exception("Failed to set confirm mode", 8674); } $this->confirmMode = true; } function setConnection (Connection $rConn) { $this->myConn = $rConn; } function setChanId ($chanId) { $this->chanId = $chanId; } function getChanId () { return $this->chanId; } function setFrameMax ($frameMax) { $this->frameMax = $frameMax; } function initChannel () { $pl = $this->myConn->getProtocolLoader(); $meth = new wire\Method($pl('ClassFactory', 'GetMethod', array('channel', 'open')), $this->chanId); $meth->setField('reserved-1', ''); $resp = $this->myConn->invoke($meth); } function __call ($class, $args) { if ($this->destroyed) { throw new \Exception("Attempting to use a destroyed channel", 8766); } $m = $this->myConn->constructMethod($class, $args); $m->setWireChannel($this->chanId); $m->setMaxFrameSize($this->frameMax); return $m; } function invoke (wire\Method $m) { if ($this->destroyed) { throw new \Exception("Attempting to use a destroyed channel", 8767); } else if (! $this->flow) { trigger_error("Channel is closed", E_USER_WARNING); return; } else if (is_null($tmp = $m->getWireChannel())) { $m->setWireChannel($this->chanId); } else if ($tmp != $this->chanId) { throw new \Exception("Method is invoked through the wrong channel", 7645); } if ($this->confirmMode && $m->amqpClass == 'basic.publish') { $this->confirmSeq++; $this->confirmSeqs[$this->confirmSeq] = $m; } return $this->myConn->invoke($m); } function handleChannelMessage (wire\Method $meth) { switch ($meth->amqpClass) { case 'channel.flow': $this->flow = ! $this->flow; if ($r = $meth->getMethodProto()->getResponses()) { $meth = new wire\Method($r[0], $this->chanId); $this->invoke($meth); } return false; case 'channel.close': $pl = $this->myConn->getProtocolLoader(); if ($culprit = $pl('ClassFactory', 'GetMethod', array($meth->getField('class-id'), $meth->getField('method-id')))) { $culprit = $culprit->getSpecName(); } else { $culprit = '(Unknown or unspecified)'; } $errCode = $pl('ProtoConsts', 'Konstant', array($meth->getField('reply-code'))); $eb = ''; foreach ($meth->getFields() as $k => $v) { $eb .= sprintf("(%s=%s) ", $k, $v); } $tmp = $meth->getMethodProto()->getResponses(); $closeOk = new wire\Method($tmp[0], $this->chanId); $em = "[channel.close] reply-code={$errCode['name']} triggered by $culprit: $eb"; try { $this->myConn->invoke($closeOk); $em .= " Channel closed OK"; $n = 3687; } catch (\Exception $e) { $em .= " Additionally, channel closure ack send failed"; $n = 2435; } throw new \Exception($em, $n); case 'channel.close-ok': case 'channel.open-ok': case 'channel.flow-ok': return true; default: throw new \Exception("Received unexpected channel message: {$meth->amqpClass}", 8795); } } function handleChannelDelivery (wire\Method $meth) { switch ($meth->amqpClass) { case 'basic.deliver': return $this->deliverConsumerMessage($meth); case 'basic.return': if ($this->callbackHandler) { $this->callbackHandler->publishReturn($meth); } return false; case 'basic.ack': $this->removeConfirmSeqs($meth, 'publishConfirm'); return false; case 'basic.nack': $this->removeConfirmSeqs($meth, 'publishNack'); return false; case 'basic.cancel': $this->handleConsumerCancel($meth); break; default: throw new \Exception("Received unexpected channel delivery:\n{$meth->amqpClass}", 87998); } } private function handleConsumerCancel ($meth) { $ctag = $meth->getField('consumer-tag'); list($cons, $status,) = $this->getConsumerAndStatus($ctag); if ($cons && $status == 'READY') { $cons->handleCancel($meth, $this); $this->setConsumerStatus($ctag, 'CLOSED') OR trigger_error("Failed to set consumer status flag (2)", E_USER_WARNING); if (! $meth->getField('no-wait')) { $this->invoke($this->basic('cancel-ok', array('consumer-tag', $ctag))); } } else if ($cons) { $m = sprintf("Cancellation message delivered to closed consumer %s", $ctag); trigger_error($m, E_USER_WARNING); } else { $m = sprintf("Unable to load consumer for consumer cancellation %s", $ctag); trigger_error($m, E_USER_WARNING); } } private function deliverConsumerMessage ($meth) { $ctag = $meth->getField('consumer-tag'); $response = false; list($cons, $status, $consParams) = $this->getConsumerAndStatus($ctag); if ($cons && $status == 'READY') { $response = $cons->handleDelivery($meth, $this); } else if ($cons) { $m = sprintf("Message delivered to closed consumer %s in non-ready state %s -- reject %s", $ctag, $status, $meth->getField('delivery-tag')); trigger_error($m, E_USER_WARNING); $response = Connection::CONSUMER_REJECT; } else { $m = sprintf("Unable to load consumer for delivery %s -- reject %s", $ctag, $meth->getField('delivery-tag')); trigger_error($m, E_USER_WARNING); $response = Connection::CONSUMER_REJECT; } if (! $response) { return; } if (! is_array($response)) { $response = array($response); } $shouldAck = (! array_key_exists('no-ack', $consParams) || ! $consParams['no-ack']); foreach ($response as $resp) { switch ($resp) { case Connection::CONSUMER_ACK: if ($shouldAck) { $this->ack($meth, Connection::CONSUMER_ACK); } break; case Connection::CONSUMER_DROP: case Connection::CONSUMER_REJECT: if ($shouldAck) { $this->ack($meth, $resp); } break; case Connection::CONSUMER_CANCEL: $this->removeConsumerByTag($cons, $ctag); break; default: trigger_error("Invalid consumer response $resp - consumers must " . 'respond with either a single consumer flag, multiple ' . 'consumer flags, or an empty response', E_USER_WARNING); } } return false; } private function ack ($meth, $action) { if (is_null($this->ackFlag)) { $this->ackFlag = $action; } else if ($action != $this->ackFlag) { $this->flushAcks(); $this->ackFlag = $action; } $this->ackHead = $meth->getField('delivery-tag'); $this->numPendAcks++; if ($this->numPendAcks >= $this->ackBuffer) { $this->flushAcks(); } } private function flushAcks () { if (is_null($this->ackFlag)) { return; } switch ($this->ackFlag) { case Connection::CONSUMER_ACK: $ack = $this->basic('ack', array('delivery-tag' => $this->ackHead, 'multiple' => ($this->ackBuffer > 1))); $this->invoke($ack); break; case Connection::CONSUMER_REJECT: case Connection::CONSUMER_DROP: $rej = $this->basic('nack', array('delivery-tag' => $this->ackHead, 'multiple' => ($this->ackBuffer > 1), 'requeue' => ($this->ackFlag == Connection::CONSUMER_REJECT))); $this->invoke($rej); break; default: throw new \Exception("Internal (n)ack tracking state error", 2956); } $this->ackFlag = $this->ackHead = null; $this->numPendAcks = 0; } private function removeConfirmSeqs (wire\Method $meth, $event) { if (! $this->callbackHandler) { trigger_error("Received publish confirmations with no channel event handler in place", E_USER_WARNING); return; } $dtag = $meth->getField('delivery-tag'); if ($meth->getField('multiple')) { foreach (array_keys($this->confirmSeqs) as $sk) { if ($sk <= $dtag) { $this->callbackHandler->$event($this->confirmSeqs[$sk]); unset($this->confirmSeqs[$sk]); } } } else { if (array_key_exists($dtag, $this->confirmSeqs)) { $this->callbackHandler->$event($this->confirmSeqs[$dtag]); unset($this->confirmSeqs[$dtag]); } } } function shutdown () { if (! $this->invoke($this->channel('close', array('reply-code' => '', 'reply-text' => '')))) { trigger_error("Unclean channel shutdown", E_USER_WARNING); } $this->myConn->removeChannel($this); $this->destroyed = true; $this->myConn = $this->chanId = $this->ticket = null; } function addConsumer (Consumer $cons, array $cParams=null) { $this->consumers[] = array($cons, false, 'READY_WAIT', $cParams); } function canListen () { return $this->hasListeningConsumers() || $this->hasOutstandingConfirms(); } function removeConsumer (Consumer $cons) { foreach ($this->consumers as $c) { if ($c[0] === $cons) { if ($c[2] == 'READY') { $this->removeConsumerByTag($c[0], $c[1]); } return; } } trigger_error("Consumer does not belong to this Channel", E_USER_WARNING); } function removeAllConsumers () { foreach ($this->consumers as $c) { if ($c[2] == 'READY') { $this->removeConsumerByTag($c[0], $c[1]); } } } private function removeConsumerByTag (Consumer $cons, $ctag) { list(, $cstate,) = $this->getConsumerAndStatus($ctag); if ($cstate == 'CLOSED') { trigger_error("Consumer is already removed", E_USER_WARNING); return; } $cnl = $this->basic('cancel', array('consumer-tag' => $ctag, 'no-wait' => false)); $cOk = $this->invoke($cnl); if ($cOk->amqpClass == 'basic.cancel-ok') { $this->setConsumerStatus($ctag, 'CLOSED') OR trigger_error("Failed to set consumer status flag", E_USER_WARNING); } else { throw new \Exception("Failed to cancel consumer - bad broker response", 9768); } $cons->handleCancelOk($cOk, $this); } private function setConsumerStatus ($tag, $status) { foreach ($this->consumers as $k => $c) { if ($c[1] === $tag) { $this->consumers[$k][2] = $status; return true; } } return false; } private function getConsumerAndStatus ($tag) { foreach ($this->consumers as $c) { if ($c[1] == $tag) { return array($c[0], $c[2], $c[3]); } } return array(null, 'INVALID', null); } function hasListeningConsumers () { foreach ($this->consumers as $c) { if ($c[2] === 'READY') { return true; } } return false; } function getConsumerByTag ($t) { foreach ($this->consumers as $c) { if ($c[2] == 'READY' && $c[1] === $t) { return $c[0]; } } } function getConsumerTags () { $tags = array(); foreach ($this->consumers as $c) { if ($c[2] == 'READY') { $tags[] = $c[1]; } } return $tags; } function startAllConsumers () { if (! $this->consumers) { return false; } $r = false; foreach (array_keys($this->consumers) as $cnum) { if (false === $this->consumers[$cnum][1]) { $this->_startConsumer($cnum); $r = true; } } return $r; } private function _startConsumer ($cnum) { $consume = false; if (($consume = $this->consumers[$cnum][0]->getConsumeMethod($this)) && ! ($consume instanceof wire\Method)) { trigger_error("Consumer returned invalid consume method", E_USER_WARNING); $consume = false; } if (! $consume && is_array($this->consumers[$cnum][3])) { $consume = $this->basic('consume', $this->consumers[$cnum][3]); } if (! $consume) { throw new \Exception("Couldn't find any consume paramters while starting consumer", 9265); } $cOk = $this->invoke($consume); $this->consumers[$cnum][0]->handleConsumeOk($cOk, $this); $this->consumers[$cnum][2] = 'READY'; $this->consumers[$cnum][1] = $cOk->getField('consumer-tag'); $this->consumers[$cnum][3] = $consume->getFields(); } function startConsumer (Consumer $cons) { foreach ($this->consumers as $i => $c) { if ($c[0] === $cons && $c[1] === false) { $this->_startConsumer($i); return true; } } return false; } function onSelectEnd () { $this->flushAcks(); $this->consuming = false; } function isSuspended () { return ! $this->flow; } function toggleFlow () { $flow = ! $this->flow; $this->flow = true; $meth = $this->channel('flow', array('active' => $flow)); $fr = $this->invoke($meth); $newFlow = $fr->getField('active'); if ($newFlow != $flow) { trigger_error(sprintf("Flow Unexpected channel flow response, expected %d, got %d", ! $this->flow, $this->flow), E_USER_WARNING); } $this->flow = $newFlow; } }