Skip to content

Commit

Permalink
Allow usage of different routing keys in all msg producers; fix a buu…
Browse files Browse the repository at this point in the history
…g with the http-request msg producer
  • Loading branch information
gggeek committed Aug 31, 2015
1 parent 6b18aaf commit b07dba8
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 9 deletions.
3 changes: 3 additions & 0 deletions Command/QueueConsoleCommandCommand.php
Expand Up @@ -26,6 +26,7 @@ protected function configure()
->addArgument('console_command', InputArgument::REQUIRED, 'The console command to execute (string)')
->addArgument('argument/option', InputArgument::IS_ARRAY, 'Arguments and options for the executed command. Options use the syntax: option.<opt>.<val>')
->addOption('driver', 'b', InputOption::VALUE_OPTIONAL, 'The driver (string), if not default', null)
->addOption('routing-key', 'k', InputOption::VALUE_OPTIONAL, 'The routing key, if needed (string)', null)
->addOption('ttl', 't', InputOption::VALUE_OPTIONAL, 'Validity of message (in seconds)', null)
->addOption('novalidate', null, InputOption::VALUE_NONE, 'Skip checking if the command is registered with the sf console')
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging');
Expand Down Expand Up @@ -55,6 +56,7 @@ protected function execute(InputInterface $input, OutputInterface $output)

$driverName = $input->getOption('driver');
$queue = $input->getArgument('queue_name');
$key = $input->getOption('routing-key');
$arguments = $input->getArgument('argument/option');
// parse arguments to tell options apart
$options = array();
Expand All @@ -75,6 +77,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
$command,
$arguments,
$options,
$key,
$ttl = $input->getOption('ttl')
);

Expand Down
12 changes: 10 additions & 2 deletions Service/MessageProducer/ConsoleCommand.php
Expand Up @@ -13,7 +13,7 @@
*/
class ConsoleCommand extends BaseMessageProducer
{
public function publish($command, $arguments = array(), $options = array(), $ttl = null)
public function publish($command, $arguments = array(), $options = array(), $routingKey = null, $ttl = null)
{
$msg = array(
'command' => $command,
Expand All @@ -27,6 +27,14 @@ public function publish($command, $arguments = array(), $options = array(), $ttl
// see also http://www.rabbitmq.com/ttl.html
$extras = array('expiration' => $ttl * 1000);
}
$this->doPublish($msg, str_replace(':', '.', $command), $extras);
if ($routingKey === null) {
$routingKey = $this->getRoutingKey($command, $arguments = array(), $options);
}
$this->doPublish($msg, $routingKey, $extras);
}

protected function getRoutingKey($command, $arguments = array(), $options = array())
{
return str_replace(':', '.', $command);
}
}
15 changes: 12 additions & 3 deletions Service/MessageProducer/HTTPRequest.php
Expand Up @@ -16,12 +16,13 @@ class HTTPRequest extends BaseMessageProducer
/**
* @param string $url
* @param array $options All CURL options are accepted
* @param string $routingKey if null, it will be calculated automatically
* @param null $ttl
*/
public function publish($url, $options = array(), $ttl = null)
public function publish($url, $options = array(), $routingKey = null, $ttl = null)
{
$msg = array(
'url' => $command,
'url' => $url,
'options' => $options
);
$extras = array();
Expand All @@ -31,6 +32,14 @@ public function publish($url, $options = array(), $ttl = null)
// see also http://www.rabbitmq.com/ttl.html
$extras = array('expiration' => $ttl * 1000);
}
$this->doPublish($msg, str_replace(array(':', '/'), '.', $command), $extras);
if ($routingKey === null) {
$routingKey = $this->getRoutingKey($url, $options);
}
$this->doPublish($msg, $routingKey, $extras);
}

protected function getRoutingKey($url, $options = array())
{
return str_replace(array(':', '/'), '.', $url);
}
}
12 changes: 10 additions & 2 deletions Service/MessageProducer/SymfonyService.php
Expand Up @@ -13,7 +13,7 @@
*/
class SymfonyService extends BaseMessageProducer
{
public function publish($service, $method, $arguments = array(), $ttl = null)
public function publish($service, $method, $arguments = array(), $routingKey = null, $ttl = null)
{
$msg = array(
'service' => $service,
Expand All @@ -27,6 +27,14 @@ public function publish($service, $method, $arguments = array(), $ttl = null)
// see also http://www.rabbitmq.com/ttl.html
$extras = array('expiration' => $ttl * 1000);
}
$this->doPublish($msg, str_replace(':', '.', $service) . '.' . $method, $extras);
if ($routingKey === null) {
$routingKey = $this->getRoutingKey($service, $method, $arguments);
}
$this->doPublish($msg, $routingKey, $extras);
}

protected function getRoutingKey($service, $method, $arguments = array())
{
return str_replace(':', '.', $service) . '.' . $method;
}
}
12 changes: 10 additions & 2 deletions Service/MessageProducer/XmlrpcCall.php
Expand Up @@ -13,7 +13,7 @@
*/
class XmlrpcCall extends BaseMessageProducer
{
public function publish($server, $method, $arguments = array(), $ttl = null)
public function publish($server, $method, $arguments = array(), $routingKey = null, $ttl = null)
{
$msg = array(
'server' => $server,
Expand All @@ -27,6 +27,14 @@ public function publish($server, $method, $arguments = array(), $ttl = null)
// see also http://www.rabbitmq.com/ttl.html
$extras = array('expiration' => $ttl * 1000);
}
$this->doPublish($msg, str_replace(array(':', '/'), '.', $server . '.' . $method), $extras);
if ($routingKey === null) {
$routingKey = $this->getRoutingKey($server, $method, $arguments);
}
$this->doPublish($msg, $routingKey, $extras);
}

protected function getRoutingKey($server, $method, $arguments = array())
{
return str_replace(array(':', '/'), '.', $server . '.' . $method);
}
}

0 comments on commit b07dba8

Please sign in to comment.