Skip to content

Commit

Permalink
QueueBag definition moved to out of builder
Browse files Browse the repository at this point in the history
  • Loading branch information
dhanielo committed Jan 5, 2017
1 parent 0db478b commit 248a24d
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 123 deletions.
4 changes: 0 additions & 4 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ public function getConfigTreeBuilder()
{
$tree = new TreeBuilder();
$rootNode = $tree->root('cmobi_rabbitmq');
$rootNode
->children()
->scalarNode('basic_qos')->defaultValue(1)->end()
->end();
$rootNode
->children()
->scalarNode('log_path')->end()
Expand Down
6 changes: 6 additions & 0 deletions Queue/QueueBagInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,10 @@ public function getQueueConsume();
* @return array|false
*/
public function getExchangeDeclare();

/**
* @param array $options
* @return QueueBagInterface
*/
public function registerOptions(array $options);
}
17 changes: 4 additions & 13 deletions Queue/QueueBuilderInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,14 @@

namespace Cmobi\RabbitmqBundle\Queue;


interface QueueBuilderInterface
{
/**
* @return string|false
*/
public function getExchangeName();

/**
* @return string|false
*/
public function getExchangeType();

/**
* @param $queueName
* @param QueueServiceInterface $queueService
*
* @return QueueInterface
* @param QueueBagInterface $queueBag
* @return Queue
*/
public function buildQueue($queueName, QueueServiceInterface $queueService);
public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag);
}
1 change: 0 additions & 1 deletion Tests/app/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ framework:
secret: "Three can keep a secret, if two of them are dead."

cmobi_rabbitmq:
basic_qos: "%cmobi_basic_qos%"
log_path: "%kernel.root_dir%/logs/rabbitmq_service.log"
connections:
default:
Expand Down
1 change: 0 additions & 1 deletion Tests/app/config/parameters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ parameters:
cmobi_rabbitmq_password: 'guest'
cmobi_rabbitmq_vhost: '/'
cmobi_rabbitmq_lazy: false
cmobi_basic_qos: 1
cmobi_rabbitmq_connection_timeout: 3
cmobi_rabbitmq_read_write_timeout: 1160
cmobi_rabbitmq_keepalive: true
Expand Down
43 changes: 7 additions & 36 deletions Transport/PubSub/SubscriberBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,40 @@
namespace Cmobi\RabbitmqBundle\Transport\PubSub;

use Cmobi\RabbitmqBundle\Connection\ConnectionManager;
use Cmobi\RabbitmqBundle\Connection\Exception\InvalidAMQPChannelException;
use Cmobi\RabbitmqBundle\Queue\Queue;
use Cmobi\RabbitmqBundle\Queue\QueueBagInterface;
use Cmobi\RabbitmqBundle\Queue\QueueBuilderInterface;
use Cmobi\RabbitmqBundle\Queue\QueueServiceInterface;
use Psr\Log\LoggerInterface;

class SubscriberBuilder implements QueueBuilderInterface
{
private $exchangeName;
private $exchangeType;
private $connectionManager;
private $logger;
private $parameters;

public function __construct(
$exchangeName,
$exchangeType = ExchangeType::FANOUT,
ConnectionManager $connManager,
LoggerInterface $logger,
array $parameters
) {
$this->exchangeName = $exchangeName;
$this->exchangeType = $exchangeType;
$this->connectionManager = $connManager;
$this->logger = $logger;
$this->parameters = $parameters;
$this->channel = null;
}

/**
* @param $queueName
* @param QueueServiceInterface $queueService
*
* @param QueueBagInterface $queueBag
* @return Queue
*
* @throws InvalidAMQPChannelException
* @throws \Exception
*/
public function buildQueue($queueName, QueueServiceInterface $queueService)
public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag)
{
$qos = 1;

if (array_key_exists('cmobi_rabbitmq.basic_qos', $this->parameters)) {
$qos = $this->parameters['cmobi_rabbitmq.basic_qos'];
if (! $queueBag instanceof SubscriberQueueBag) {
throw new \Exception('Unsupported QueueBag');
}
$subQueueBag = new SubscriberQueueBag($this->getExchangeName(), $this->getExchangeType(), $queueName, $qos);

$queue = new Queue($this->getConnectionManager(), $subQueueBag, $this->logger);
$queue = new Queue($this->getConnectionManager(), $queueBag, $this->logger);
$queueCallback = new SubscriberQueueCallback($queueService);
$queue->setCallback($queueCallback);

Expand All @@ -63,20 +50,4 @@ public function getConnectionManager()
{
return $this->connectionManager;
}

/**
* @return string|false
*/
public function getExchangeName()
{
return $this->exchangeName;
}

/**
* @return string|false
*/
public function getExchangeType()
{
return $this->exchangeType;
}
}
35 changes: 29 additions & 6 deletions Transport/Rpc/RpcQueueBag.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

class RpcQueueBag implements QueueBagInterface
{
private $queue;
private $basicQos;
private $options;

public function __construct(
Expand All @@ -22,9 +24,9 @@ public function __construct(
$noAck = false,
$noLocal = false
) {
$this->queue = $queue;
$this->basicQos = $basicQos;
$this->options = [
'queue' => $queue,
'basic_qos' => $basicQos,
'passive' => $passive,
'durable' => $durable,
'exclusive' => $exclusive,
Expand All @@ -43,31 +45,31 @@ public function __construct(
*/
public function setQueue($queue)
{
$this->options['queue'] = $queue;
$this->queue = $queue;
}

/**
* @return string|mixed
*/
public function getQueue()
{
return $this->options['queue'];
return $this->queue;
}

/**
* @param $qos
*/
public function setBasicQos($qos)
{
$this->options['basic_qos'] = $qos;
$this->basicQos = $qos;
}

/**
* @return int
*/
public function getBasicQos()
{
return $this->options['basic_qos'];
return $this->basicQos;
}

/**
Expand Down Expand Up @@ -287,4 +289,25 @@ public function getType()
{
return false;
}

/**
* @param array $options
* @return QueueBagInterface
*/
public function registerOptions(array $options)
{
$this->options['basic_qos'] = $options['basic_qos'];
$this->options['passive'] = $options['passive'];
$this->options['durable'] = $options['durable'];
$this->options['exclusive'] = $options['exclusive'];
$this->options['auto_delete'] = $options['auto_delete'];
$this->options['no_wait'] = $options['no_wait'];
$this->options['arguments'] = $options['arguments'];
$this->options['ticket'] = $options['ticket'];
$this->options['consumer_tag'] = $options['consumer_tag'];
$this->options['no_ack'] = $options['no_ack'];
$this->options['no_local'] = $options['no_local'];

return $this;
}
}
39 changes: 8 additions & 31 deletions Transport/Rpc/RpcServerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
namespace Cmobi\RabbitmqBundle\Transport\Rpc;

use Cmobi\RabbitmqBundle\Connection\ConnectionManager;
use Cmobi\RabbitmqBundle\Connection\Exception\InvalidAMQPChannelException;
use Cmobi\RabbitmqBundle\Queue\Queue;
use Cmobi\RabbitmqBundle\Queue\QueueBagInterface;
use Cmobi\RabbitmqBundle\Queue\QueueBuilderInterface;
use Cmobi\RabbitmqBundle\Queue\QueueServiceInterface;
use Psr\Log\LoggerInterface;
Expand All @@ -13,34 +13,27 @@ class RpcServerBuilder implements QueueBuilderInterface
{
private $connectionManager;
private $logger;
private $parameters;

public function __construct(ConnectionManager $connManager, LoggerInterface $logger, array $parameters)
public function __construct(ConnectionManager $connManager, LoggerInterface $logger)
{
$this->connectionManager = $connManager;
$this->logger = $logger;
$this->parameters = $parameters;
$this->channel = null;
}

/**
* @param $queueName
* @param QueueServiceInterface $queueService
*
* @param QueueBagInterface $queueBag
* @return Queue
*
* @throws InvalidAMQPChannelException
* @throws \Exception
*/
public function buildQueue($queueName, QueueServiceInterface $queueService)
public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag)
{
$qos = 1;

if (array_key_exists('cmobi_rabbitmq.basic_qos', $this->parameters)) {
$qos = $this->parameters['cmobi_rabbitmq.basic_qos'];
if (! $queueBag instanceof RpcQueueBag) {
throw new \Exception('Unsupported QueueBag');
}
$rpcQueueBag = new RpcQueueBag($queueName, $qos);

$queue = new Queue($this->getConnectionManager(), $rpcQueueBag, $this->logger);
$queue = new Queue($this->getConnectionManager(), $queueBag, $this->logger);
$queueCallback = new RpcQueueCallback($queueService);
$queue->setCallback($queueCallback);

Expand All @@ -54,20 +47,4 @@ public function getConnectionManager()
{
return $this->connectionManager;
}

/**
* @return string|false
*/
public function getExchangeName()
{
return false;
}

/**
* @return string|false
*/
public function getExchangeType()
{
return false;
}
}
38 changes: 8 additions & 30 deletions Transport/Worker/WorkerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Cmobi\RabbitmqBundle\Connection\ConnectionManager;
use Cmobi\RabbitmqBundle\Connection\Exception\InvalidAMQPChannelException;
use Cmobi\RabbitmqBundle\Queue\Queue;
use Cmobi\RabbitmqBundle\Queue\QueueBagInterface;
use Cmobi\RabbitmqBundle\Queue\QueueBuilderInterface;
use Cmobi\RabbitmqBundle\Queue\QueueServiceInterface;
use Psr\Log\LoggerInterface;
Expand All @@ -13,34 +14,27 @@ class WorkerBuilder implements QueueBuilderInterface
{
private $connectionManager;
private $logger;
private $parameters;

public function __construct(ConnectionManager $connManager, LoggerInterface $logger, array $parameters)
public function __construct(ConnectionManager $connManager, LoggerInterface $logger)
{
$this->connectionManager = $connManager;
$this->logger = $logger;
$this->parameters = $parameters;
$this->channel = null;
}

/**
* @param $queueName
* @param QueueServiceInterface $queueService
*
* @param QueueBagInterface $queueBag
* @return Queue
*
* @throws InvalidAMQPChannelException
* @throws \Exception
*/
public function buildQueue($queueName, QueueServiceInterface $queueService)
public function buildQueue($queueName, QueueServiceInterface $queueService, QueueBagInterface $queueBag)
{
$qos = 1;

if (array_key_exists('cmobi_rabbitmq.basic_qos', $this->parameters)) {
$qos = $this->parameters['cmobi_rabbitmq.basic_qos'];
if (! $queueBag instanceof WorkerQueueBag) {
throw new \Exception('Unsupported QueueBag');
}
$rpcQueueBag = new WorkerQueueBag($queueName, $qos);

$queue = new Queue($this->getConnectionManager(), $rpcQueueBag, $this->logger);
$queue = new Queue($this->getConnectionManager(), $queueBag, $this->logger);
$queueCallback = new WorkerQueueCallback($queueService);
$queue->setCallback($queueCallback);

Expand All @@ -54,20 +48,4 @@ public function getConnectionManager()
{
return $this->connectionManager;
}

/**
* @return string|false
*/
public function getExchangeName()
{
return false;
}

/**
* @return string|false
*/
public function getExchangeType()
{
return false;
}
}
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
"symfony/http-kernel": "2.8.*",
"php-amqplib/php-amqplib": "2.6.3",
"symfony/console": "^2.8",
"monolog/monolog": "^1.19"
"monolog/monolog": "^1.19",
"symfony/options-resolver": "^3.2"
},
"require-dev": {
"symfony/framework-bundle": "~2.8",
Expand Down

0 comments on commit 248a24d

Please sign in to comment.