Skip to content

Commit

Permalink
Implement Stoppable transport
Browse files Browse the repository at this point in the history
  • Loading branch information
Vasil coylOne Kulakov committed Sep 13, 2017
1 parent 3dc22a0 commit 41d051b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 11 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"require": {
"php": ">=7.0",

"event-band/band-framework": "~2.0 >=2.2.0",
"event-band/band-framework": "~2.0 >=2.2.2",

"php-amqplib/php-amqplib": "~2.6"
},
Expand Down
9 changes: 5 additions & 4 deletions src/EventBand/Transport/AmqpLib/AmqpConnectionBuilder.php
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
<?php
/**
* @LICENSE_TEXT
* @author Kirill chEbba Chebunin <iam@chebba.org>
* @author Vasil coylOne Kulakov <kulakov@vasiliy.pro>
*
* This source file is subject to the MIT license that is bundled
* with this package in the file LICENSE.
*/

namespace EventBand\Transport\AmqpLib;

use EventBand\Transport\Amqp\Definition\ConnectionBuilder;
use EventBand\Transport\Amqp\Definition\ConnectionDefinition;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPLazyConnection;

/**
* Class AmqpConnectionBuilder
*
* @author Kirill chEbba Chebunin <iam@chebba.org>
*/
class AmqpConnectionBuilder implements AmqpConnectionFactory
Expand Down
23 changes: 17 additions & 6 deletions src/EventBand/Transport/AmqpLib/AmqpLibDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,15 @@

namespace EventBand\Transport\AmqpLib;

use EventBand\Transport\Amqp\Definition\ConnectionDefinition;
use EventBand\Transport\Amqp\Definition\ExchangeDefinition;
use EventBand\Transport\Amqp\Definition\QueueDefinition;
use EventBand\Transport\Amqp\Driver\AmqpDriver;
use EventBand\Transport\Amqp\Driver\CustomAmqpMessage;
use EventBand\Transport\Amqp\Driver\DriverException;
use EventBand\Transport\Amqp\Driver\MessageDelivery;
use EventBand\Transport\Amqp\Driver\MessagePublication;
use EventBand\Transport\StoppableTransport;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage as AmqpLibMessage;

/**
Expand All @@ -28,7 +26,7 @@
* @author Kirill chEbba Chebunin <iam@chebba.org>
* @license http://opensource.org/licenses/mit-license.php MIT
*/
class AmqpLibDriver implements AmqpDriver
class AmqpLibDriver implements AmqpDriver, StoppableTransport
{
private $connectionFactory;
/**
Expand All @@ -40,6 +38,10 @@ class AmqpLibDriver implements AmqpDriver
*/
private $channel;

private $currentTag;

private $stopped = false;

public function __construct(AmqpConnectionFactory $connectionFactory)
{
$this->connectionFactory = $connectionFactory;
Expand Down Expand Up @@ -112,7 +114,7 @@ public function consume($queue, callable $callback, $idleTimeout, $timeout = nul
try {
$active = true;
$channel = $this->getChannel();
$tag = $channel->basic_consume(
$this->currentTag = $channel->basic_consume(
$queue,
'',
false, false, false, false,
Expand All @@ -125,6 +127,9 @@ function (AMQPLibMessage $msg) use (&$active, $callback, $channel, $queue) {

$startTime = time();
while ($active) {
if ($this->stopped) {
break;
}
if ($timeout) {
$newTimeout = min($startTime + $idleTimeout - time(), $idleTimeout);
if ($newTimeout <= 0) {
Expand Down Expand Up @@ -157,7 +162,7 @@ function (AMQPLibMessage $msg) use (&$active, $callback, $channel, $queue) {
}

// Cancel consumer and close channel
$channel->basic_cancel($tag);
$channel->basic_cancel($this->currentTag);
$this->closeChannel();
} catch (\Exception $e) {
throw new DriverException('Basic consume error', $e);
Expand Down Expand Up @@ -268,4 +273,10 @@ public function deleteQueue(QueueDefinition $queue, $ifUnused = false, $ifEmpty
throw new DriverException(sprintf('Queue delete error "%s"', $queue->getName()) , $e);
}
}

public function stop()
{
$this->stopped = true;
$this->getChannel()->basic_cancel($this->currentTag, false, true);
}
}

0 comments on commit 41d051b

Please sign in to comment.