Skip to content

Commit

Permalink
Make it possible to register drivers via a tag; allow cli commands to…
Browse files Browse the repository at this point in the history
… be tied to a driver at runtime
  • Loading branch information
gggeek committed Aug 30, 2015
1 parent 53d566f commit 1489c8c
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 125 deletions.
15 changes: 6 additions & 9 deletions Adapter/DriverInterface.php
Expand Up @@ -14,7 +14,7 @@ interface DriverInterface
// *** Producer side ***

/**
* @param $queueName
* @param string $queueName
* @return \Kaliop\QueueingBundle\Queue\MessageProducerInterface
*/
public function getMessageProducer($queueName);
Expand All @@ -37,14 +37,11 @@ public function acceptMessage($msg);
*/
public function decodeMessage($msg);

// *** The dark side ;-) ***

/**
* Returns (if supported) an array of queues configured in the application.
* NB: these are the names of queues as seen by the app
* - NOT the queues available on the broker
* - NOT using the queues names used by the broker (unless those are always identical to the names used by the app)
*
* @param int $type
* @return string[]
* @param string $queueName
* @return \Kaliop\QueueingBundle\Queue\QueueManagerInterface
*/
public function listQueues($type = Queue::TYPE_ANY);
public function getQueueManager($queueName);
}
49 changes: 49 additions & 0 deletions Adapter/DriverManager.php
@@ -0,0 +1,49 @@
<?php

namespace Kaliop\QueueingBundle\Adapter;

use Symfony\Component\DependencyInjection\ContainerInterface;

class DriverManager
{
protected $aliases = array();
protected $defaultDriver;

public function __construct(ContainerInterface $container)
{
$this->container = $container;
}

/**
* @param string $alias
* @param string $serviceName
*/
public function registerDriver($alias, $serviceName)
{
$this->aliases[$alias] = $serviceName;
}

public function setDefaultDriver($alias)
{
$this->defaultDriver = $alias;
}

/**
* @param string $alias null when asking for the default driver
* @return \Kaliop\QueueingBundle\Adapter\DriverInterface
* @throws \Exception if driver is not registered
*/
public function getDriver($alias = null)
{
if ($alias == null) {
$alias = $this->defaultDriver;
}

if (!isset($this->aliases[$alias])) {
throw new \InvalidArgumentException(sprintf('No driver defined with the alias "%s".', $alias));
}

/// @todo shall we check that the good interface is declared?
return $this->container->get($this->aliases[$alias]);
}
}
67 changes: 8 additions & 59 deletions Adapter/RabbitMq/Driver.php
Expand Up @@ -28,73 +28,22 @@ public function decodeMessage($message)
}

/**
* @param $queueName
* @param string $queueName
* @return \Kaliop\QueueingBundle\Queue\MessageProducerInterface
*/
public function getMessageProducer($queueName)
{
return $this->container->get('old_sound_rabbit_mq.' . $queueName .'_producer');
return $this->container->get('old_sound_rabbit_mq.' . $queueName . '_producer');
}

/**
* This is a bit dumb, but so far all we have found is to go through all services, and check based on names:
* @param int $type
* @return string[]
* @param string $queueName
* @return \Kaliop\QueueingBundle\Queue\QueueManagerInterface
*/
public function listQueues($type = Queue::TYPE_ANY)
public function getQueueManager($queueName)
{
$out = array();
foreach($this->findServiceIdsContaining('old_sound_rabbit_mq.') as $serviceName) {
switch($type) {
case Queue::TYPE_CONSUMER:
if (preg_match('/_consumer$/', $serviceName))
$out[] = str_replace(array('old_sound_rabbit_mq.', '_consumer'), '', $serviceName);
break;
case Queue::TYPE_PRODUCER:
if (preg_match('/_producer$/', $serviceName))
$out[] = str_replace(array('old_sound_rabbit_mq.', '_producer'), '', $serviceName);
break;
case Queue::TYPE_ANY:
if (preg_match('/_(consumer|producer)$/', $serviceName))
$out[] = str_replace(array('old_sound_rabbit_mq.', '_consumer', '_producer'), '', $serviceName);
}
}
return $out;
$mgr = $this->container->get('kaliop_queueing.amqp.queue_manager');
$mgr->setQueueName($queueName);
return $mgr;
}

private function findServiceIdsContaining($name)
{
$builder = $this->getContainerBuilder();
$serviceIds = $builder->getServiceIds();
$foundServiceIds = array();
$name = strtolower($name);
foreach ($serviceIds as $serviceId) {
if (false === strpos($serviceId, $name)) {
continue;
}
$foundServiceIds[] = $serviceId;
}

return $foundServiceIds;
}

protected function getContainerBuilder()
{
/// @todo reintroduce check
//if (!$this->getApplication()->getKernel()->isDebug()) {
// throw new \LogicException(sprintf('Debug information about the container is only available in debug mode.'));
//}

if (!is_file($cachedFile = $this->container->getParameter('debug.container.dump'))) {
throw new \LogicException(sprintf('Debug information about the container could not be found. Please clear the cache and try again.'));
}

$container = new ContainerBuilder();

$loader = new XmlFileLoader($container, new FileLocator());
$loader->load($cachedFile);

return $container;
}

}
92 changes: 89 additions & 3 deletions Adapter/RabbitMq/QueueManager.php
Expand Up @@ -11,13 +11,15 @@
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
use Symfony\Component\DependencyInjection\Exception\ServiceNotFoundException;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
use Symfony\Component\Config\FileLocator;
use InvalidArgumentException;
use Kaliop\QueueingBundle\Queue\Queue;
use Kaliop\QueueingBundle\Queue\QueueManagerInterface;

/**
* A class dedicated not really to sending messages to a queue, bur rather to sending control commands
*
* @todo add a new action: listing available queues (i.e. defined in config)
*/
class QueueManager extends BaseMessageProducer implements ContainerAwareInterface, QueueManagerInterface
{
Expand All @@ -31,7 +33,16 @@ public function setContainer(ContainerInterface $container = null)

public function listActions()
{
return array('purge', 'delete', 'info');
return array('purge', 'delete', 'info', 'list');
}

/**
* Reimplemented to avoid throw on empty queue name
* @param string $queue
*/
public function setQueueName($queue)
{
$this->queue = $queue;
}

public function executeAction($action)
Expand All @@ -46,6 +57,9 @@ public function executeAction($action)
case 'info':
return $this->queueInfo();

case 'list':
return $this->listQueues();

default:
throw new InvalidArgumentException("Action $action not supported");
}
Expand Down Expand Up @@ -102,6 +116,78 @@ protected function queueInfo()
);
}

/**
* Returns (if supported) an array of queues configured in the application.
* NB: these are the names of queues as seen by the app
* - NOT the queues available on the broker
* - NOT using the queues names used by the broker (unless those are always identical to the names used by the app)
*
* It is a bit dumb, but so far all we have found is to go through all services, and check based on names:
*
* @param int $type
* @return string[] index is queue name, value is queue type
*/
public function listQueues($type = Queue::TYPE_ANY)
{
$out = array();
foreach ($this->findServiceIdsContaining('old_sound_rabbit_mq.') as $serviceName) {
switch ($type) {
case Queue::TYPE_CONSUMER:
if (preg_match('/_consumer$/', $serviceName))
$out[str_replace(array('old_sound_rabbit_mq.', '_consumer'), '', $serviceName)] = Queue::TYPE_CONSUMER;
break;
case Queue::TYPE_PRODUCER:
if (preg_match('/_producer$/', $serviceName))
$out[str_replace(array('old_sound_rabbit_mq.', '_producer'), '', $serviceName)] = Queue::TYPE_PRODUCER;
break;
case Queue::TYPE_ANY:
if (preg_match('/_consumer$/', $serviceName))
$out[str_replace(array('old_sound_rabbit_mq.', '_consumer'), '', $serviceName)] = Queue::TYPE_CONSUMER;
if (preg_match('/_producer$/', $serviceName))
$out[str_replace(array('old_sound_rabbit_mq.', '_producer'), '', $serviceName)] = Queue::TYPE_PRODUCER;
}
}
return $out;
}

protected function findServiceIdsContaining($name)
{
$builder = $this->getContainerBuilder();
$serviceIds = $builder->getServiceIds();
$foundServiceIds = array();
$name = strtolower($name);
foreach ($serviceIds as $serviceId) {
if (false === strpos($serviceId, $name)) {
continue;
}
$foundServiceIds[] = $serviceId;
}

return $foundServiceIds;
}

/**
* @return ContainerBuilder
*/
protected function getContainerBuilder()
{
/// @todo reintroduce check
//if (!$this->getApplication()->getKernel()->isDebug()) {
// throw new \LogicException(sprintf('Debug information about the container is only available in debug mode.'));
//}

if (!is_file($cachedFile = $this->container->getParameter('debug.container.dump'))) {
throw new \LogicException(sprintf('Debug information about the container could not be found. Please clear the cache and try again.'));
}

$container = new ContainerBuilder();

$loader = new XmlFileLoader($container, new FileLocator());
$loader->load($cachedFile);

return $container;
}

/**
* Hack: generally queues are defined consumer-side, so we try that 1st and producer-side 2nd (but that only gives
* us channel usually).
Expand Down
56 changes: 26 additions & 30 deletions Command/ManageQueueCommand.php
Expand Up @@ -18,12 +18,12 @@ class ManageQueueCommand extends BaseCommand
protected function configure()
{
$this
->setName( 'kaliop_queueing:managequeue' )
->setDescription( "Sends control commands to a queue to f.e. purge it or grab some stats" )
->addArgument( 'action', InputArgument::REQUIRED, 'The action to execute. use "help" to see all available' )
->addArgument( 'queue_name', InputArgument::OPTIONAL, 'The queue name (string)', '' )
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging' )
;
->setName('kaliop_queueing:managequeue')
->setDescription("Sends control commands to a queue to f.e. purge it or grab some stats")
->addArgument('action', InputArgument::REQUIRED, 'The action to execute. use "help" to see all available')
->addArgument('queue_name', InputArgument::OPTIONAL, 'The queue name (string)', '')
->addOption('driver_name', 'b', InputOption::VALUE_OPTIONAL, 'The driver (string), if not default', null)
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging');
}

/**
Expand All @@ -32,45 +32,41 @@ protected function configure()
* @return void
* @throws \InvalidArgumentException
*/
protected function execute( InputInterface $input, OutputInterface $output )
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->setOutput( $output );
$this->setOutput($output);

if ( defined( 'AMQP_DEBUG' ) === false )
{
define( 'AMQP_DEBUG', (bool) $input->getOption( 'debug' ) );
if (defined('AMQP_DEBUG') === false) {
define('AMQP_DEBUG', (bool)$input->getOption('debug'));
}

$command = $input->getArgument( 'action' );
$queue = $input->getArgument( 'queue_name' );
$driverName = $input->getOption('driver_name');
$command = $input->getArgument('action');
$queue = $input->getArgument('queue_name');

/// @var \Kaliop\QueueingBundle\Service\MessageProducer\ $messageBroker
$messageBroker = $this->getContainer()->get( 'kaliop_queueing.message_producer.queue_control' );
$driver = $this->getContainer()->get('kaliop_queueing.driverManager')->getDriver($driverName);
$queueManager = $driver->getQueueManager($queue);

if ( $command == 'list' || $command == 'help' )
{
$this->writeln( "Available commands: " . implode( ', ', $messageBroker->listActions( $queue ) ) );
if ($command == 'help') {
$this->writeln("Available actions: " . implode(', ', $queueManager->listActions($queue)));
return;
}

if ( ! in_array( $command, $messageBroker->listActions( $queue ) ) )
{
$this->writeln( "Unrecognized command $command\nAvailable commands: " . implode( ', ', $messageBroker->listActions( $queue ) ) );
if (!in_array($command, $queueManager->listActions($queue))) {
$this->writeln("Unrecognized action $command\nAvailable actions: " . implode(', ', $queueManager->listActions($queue)));
return;
}

$messageBroker->setQueueName( $queue );
$result = $messageBroker->executeAction( $command );
$queueManager->setQueueName($queue);
$result = $queueManager->executeAction($command);

$this->writeln( "Sent $command to queue $queue" );
$this->writeln("Sent '$command' to queue $queue");

if ( $result != '' )
{
if ( is_array( $result ) )
{
$result = print_r( $result, true );
if ($result != '') {
if (is_array($result)) {
$result = print_r($result, true);
}
$this->writeln( "Result: $result" );
$this->writeln("Result: $result");
}
}
}
4 changes: 4 additions & 0 deletions Command/QueueConsoleCommandCommand.php
Expand Up @@ -25,6 +25,7 @@ protected function configure()
->addArgument('queue_name', InputArgument::REQUIRED, 'The queue name (string)')
->addArgument('console_command', InputArgument::REQUIRED, 'The console command to execute (string)')
->addArgument('argument/option', InputArgument::IS_ARRAY, 'Arguments and options for the executed command. Options use the syntax: option.<opt>.<val>')
->addOption('driver_name', 'b', InputOption::VALUE_OPTIONAL, 'The driver (string), if not default', null)
->addOption('ttl', 't', InputOption::VALUE_OPTIONAL, 'Validity of message (in seconds)', null)
->addOption('novalidate', null, InputOption::VALUE_NONE, 'Skip checking if the command is registered with the sf console')
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging');
Expand Down Expand Up @@ -52,6 +53,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
define('AMQP_DEBUG', (bool)$input->getOption('debug'));
}

$driverName = $input->getOption('driver_name');
$queue = $input->getArgument('queue_name');
$arguments = $input->getArgument('argument/option');
// parse arguments to tell options apart
Expand All @@ -64,7 +66,9 @@ protected function execute(InputInterface $input, OutputInterface $output)
}
}

$driver = $this->getContainer()->get('kaliop_queueing.driverManager')->getDriver($driverName);
$messageProducer = $this->getContainer()->get('kaliop_queueing.message_producer.console_command');
$messageProducer->setDriver($driver);
$messageProducer->setQueueName($queue);
try {
$messageProducer->publish(
Expand Down

0 comments on commit 1489c8c

Please sign in to comment.