Skip to content
Browse files

added configurable QOS support

  • Loading branch information...
1 parent ef478f1 commit 8b0c30f62eff43a35ede9a1eea6af9e67add8d44 Sebastian Gronewold committed Sep 28, 2012
View
9 DependencyInjection/Configuration.php
@@ -56,6 +56,15 @@ public function getConfigTreeBuilder()
->children()
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('callback')->isRequired()->end()
+ ->arrayNode('qos_options')
+ ->canBeUnset()
+ ->useAttributeAsKey('key')
+ ->children()
+ ->scalarNode('prefetch_size')->defaultValue(0)->end()
+ ->scalarNode('prefetch_count')->defaultValue(0)->end()
+ ->booleanNode('global')->defaultValue(false)->end()
+ ->end()
+ ->end()
->end()
->end()
->end()
View
12 DependencyInjection/OldSoundRabbitMqExtension.php
@@ -97,8 +97,16 @@ protected function loadConsumers()
$definition
->addMethodCall('setExchangeOptions', array($consumer['exchange_options']))
->addMethodCall('setQueueOptions', array($consumer['queue_options']))
- ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')))
- ;
+ ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
+
+ if (array_key_exists('qos_options', $consumer)) {
+ $definition->addMethodCall('setQosOptions', array(
+ $consumer['qos_options']['prefetch_size'],
+ $consumer['qos_options']['prefetch_count'],
+ $consumer['qos_options']['global']
+ ));
+ }
+
$this->injectConnection($definition, $consumer['connection']);
if ($this->collectorEnabled) {
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
View
13 RabbitMq/BaseConsumer.php
@@ -71,4 +71,17 @@ public function getConsumerTag()
{
return $this->consumerTag;
}
+
+ /**
+ * Sets the qos settings for the current channel
+ * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0
+ *
+ * @param int $prefetchSize
+ * @param int $prefetchCount
+ * @param bool $global
+ */
+ public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false)
+ {
+ $this->ch->basic_qos($prefetchSize, $prefetchCount, $global);
+ }
}
View
13 Tests/DependencyInjection/Fixtures/test.yml
@@ -85,6 +85,19 @@ old_sound_rabbit_mq:
type: direct
callback: default_anon.callback
+ qos_test_consumer:
+ connection: foo_connection
+ exchange_options:
+ name: foo_exchange
+ type: direct
+ queue_options:
+ name: foo_queue
+ qos_options:
+ prefetch_size: 1024
+ prefetch_count: 1
+ global: true
+ callback: foo.callback
+
rpc_clients:
foo_client:
connection: foo_connection
View
26 Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php
@@ -201,6 +201,32 @@ public function testDefaultConsumerDefinition()
$this->assertEquals('%old_sound_rabbit_mq.consumer.class%', $definition->getClass());
}
+ public function testConsumerWithQosOptions()
+ {
+ $container = $this->getContainer('test.yml');
+
+ $this->assertTrue($container->has('old_sound_rabbit_mq.qos_test_consumer_consumer'));
+ $definition = $container->getDefinition('old_sound_rabbit_mq.qos_test_consumer_consumer');
+ $methodCalls = $definition->getMethodCalls();
+
+ $setQosParameters = null;
+ foreach ($methodCalls as $methodCall) {
+ if ($methodCall[0] === 'setQosOptions') {
+ $setQosParameters = $methodCall[1];
+ }
+ }
+
+ $this->assertInternalType('array', $setQosParameters);
+ $this->assertEquals(
+ array(
+ 1024,
+ 1,
+ true
+ ),
+ $setQosParameters
+ );
+ }
+
public function testFooAnonConsumerDefinition()
{
$container = $this->getContainer('test.yml');

0 comments on commit 8b0c30f

Please sign in to comment.
Something went wrong with that request. Please try again.