Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
gggeek
committed
Jul 2, 2015
0 parents
commit a379289
Showing
27 changed files
with
2,126 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
/.idea |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
<?php | ||
|
||
namespace Kaliop\QueueingBundle\Command; | ||
|
||
use OldSound\RabbitMqBundle\Command\ConsumerCommand as BaseCommand; | ||
use Symfony\Component\Console\Input\InputOption; | ||
use Symfony\Component\Console\Input\InputInterface; | ||
use Symfony\Component\Console\Output\OutputInterface; | ||
|
||
/** | ||
* Adds a few more options on top of the standard rabbitmq:consumer command, | ||
* plus it traces the fact that the --env option was used on command line or not | ||
*/ | ||
class ConsumerCommand extends BaseCommand | ||
{ | ||
/// @todo look if Sf allows the service we invoke to grab back a handle on this command instance, instead of using | ||
/// static calls (might be doable with Sf 2.4 or later and services-as-commands) | ||
protected static $label; | ||
|
||
protected static $forcedEnv; | ||
|
||
protected function configure() | ||
{ | ||
parent::configure(); | ||
|
||
$this | ||
->setName( 'kaliop_queueing:consumer' ) | ||
->addOption( 'label', null, InputOption::VALUE_REQUIRED, 'A name used to distinguish worker processes' ) | ||
->setDescription( "Starts a worker (message consumer) process" ) | ||
; | ||
} | ||
|
||
protected function execute( InputInterface $input, OutputInterface $output ) | ||
{ | ||
self::$label = $input->getOption( 'label' ); | ||
|
||
// tricky test, as hasOption( 'env' ) always returns true | ||
if ( $input->hasParameterOption( '--env' ) || $input->hasParameterOption( '-e' ) ) | ||
{ | ||
self::$forcedEnv = $input->getOption( 'env' ); | ||
} | ||
|
||
parent::execute( $input, $output ); | ||
|
||
// reset label after execution is done, in case of weird usage patterns | ||
self::$label = null; | ||
} | ||
|
||
public static function getLabel() | ||
{ | ||
return self::$label; | ||
} | ||
|
||
public static function getForcedEnv() | ||
{ | ||
return self::$forcedEnv; | ||
} | ||
|
||
protected function getConsumerService() | ||
{ | ||
return 'old_sound_rabbit_mq.%s_consumer'; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
<?php | ||
/** | ||
* User: gaetano.giunta | ||
* Date: 01/05/14 | ||
* Time: 23.37 | ||
*/ | ||
|
||
namespace Kaliop\QueueingBundle\Command; | ||
|
||
use Kaliop\QueueingBundle\Helper\BaseCommand; | ||
use Kaliop\QueueingBundle\Helper\QueuedScriptCommand; | ||
use Symfony\Component\Console\Input\InputInterface; | ||
use Symfony\Component\Console\Output\OutputInterface; | ||
use Symfony\Component\Console\Input\InputArgument; | ||
use Symfony\Component\Console\Input\InputOption; | ||
|
||
/** | ||
* A very simple "echo" cli command, useful for testing (f.e. the queues dedicated to cli commands) | ||
*/ | ||
class EchoBackCommand extends BaseCommand | ||
{ | ||
protected function configure() | ||
{ | ||
$this | ||
->setName( 'kaliop_queueing:echoback' ) | ||
->setDescription( "Echoes back the argument it receives, either to stdout or to a file" ) | ||
->addArgument( 'input', InputArgument::REQUIRED, 'What to echo (string)' ) | ||
// NB: an option with a required value remains optional | ||
->addOption( 'file', 'f', InputOption::VALUE_REQUIRED, 'A file name to append to', null ) | ||
// allow being called as "queuedscript" | ||
->addOption( QueuedScriptCommand::OPTION_SCRIPT_ID, null, InputOption::VALUE_OPTIONAL ) | ||
; | ||
} | ||
|
||
protected function execute( InputInterface $input, OutputInterface $output ) | ||
{ | ||
$this->setOutput( $output ); | ||
|
||
$time = explode( ' ', microtime() ); | ||
$msg = | ||
'It is ' . strftime( static::$DATE_FORMAT, $time[1] ) . ":" . sprintf( '%03d', ( $time[0]*1000 ) ) . | ||
" and process with pid " . getmypid() . " on host " . gethostname() . " says: " . | ||
$input->getArgument( 'input' ) . "\n"; | ||
|
||
echo $msg; | ||
|
||
$fileName = $input->getOption( 'file' ); | ||
if ( $fileName != '' ) | ||
{ | ||
file_put_contents( $fileName, $msg, FILE_APPEND ); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
<?php | ||
/** | ||
* User: gaetano.giunta | ||
* Date: 19/05/14 | ||
* Time: 18.29 | ||
*/ | ||
|
||
namespace Kaliop\QueueingBundle\Command; | ||
|
||
use Symfony\Component\Console\Input\InputOption; | ||
use Symfony\Component\Console\Input\InputArgument; | ||
use Symfony\Component\Console\Input\InputInterface; | ||
use Symfony\Component\Console\Output\OutputInterface; | ||
use Kaliop\QueueingBundle\Helper\BaseCommand; | ||
|
||
class ManageQueueCommand extends BaseCommand | ||
{ | ||
protected function configure() | ||
{ | ||
$this | ||
->setName( 'kaliop_queueing:managequeue' ) | ||
->setDescription( "Sends control commands to a queue to f.e. purge it or grab some stats" ) | ||
->addArgument( 'mode', InputArgument::REQUIRED, 'The command to execute. use "list" to see all available' ) | ||
->addArgument( 'queue_name', InputArgument::OPTIONAL, 'The queue name (string)', '' ) | ||
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging' ) | ||
; | ||
} | ||
|
||
/** | ||
* @param InputInterface $input | ||
* @param OutputInterface $output | ||
* @return void | ||
* @throws \InvalidArgumentException | ||
*/ | ||
protected function execute( InputInterface $input, OutputInterface $output ) | ||
{ | ||
$this->setOutput( $output ); | ||
|
||
if ( defined( 'AMQP_DEBUG' ) === false ) | ||
{ | ||
define( 'AMQP_DEBUG', (bool) $input->getOption( 'debug' ) ); | ||
} | ||
|
||
$command = $input->getArgument( 'mode' ); | ||
$queue = $input->getArgument( 'queue_name' ); | ||
|
||
/// @var \Kaliop\QueueingBundle\Services\MessageProducer\ $messageBroker | ||
$messageBroker = $this->getContainer()->get( 'kaliop_queueing.message_producer.queue_control.service' ); | ||
|
||
if ( $command == 'list' || $command == 'help' ) | ||
{ | ||
$this->writeln( "Available commands: " . implode( ', ', $messageBroker->listActions( $queue ) ) ); | ||
return; | ||
} | ||
|
||
if ( ! in_array( $command, $messageBroker->listActions( $queue ) ) ) | ||
{ | ||
$this->writeln( "Unrecognized command $command\nAvailable commands: " . implode( ', ', $messageBroker->listActions( $queue ) ) ); | ||
return; | ||
} | ||
|
||
$messageBroker->setQueueName( $queue ); | ||
$result = $messageBroker->executeAction( $command ); | ||
|
||
$this->writeln( "Sent $command to queue $queue" ); | ||
|
||
if ( $result != '' ) | ||
{ | ||
if ( is_array( $result ) ) | ||
{ | ||
$result = print_r( $result, true ); | ||
} | ||
$this->writeln( "Result: $result" ); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
<?php | ||
|
||
namespace Kaliop\QueueingBundle\Command; | ||
|
||
use Symfony\Component\Console\Input\InputArgument; | ||
use Symfony\Component\Console\Input\InputInterface; | ||
use Symfony\Component\Console\Input\InputOption; | ||
use Symfony\Component\Console\Output\OutputInterface; | ||
use Kaliop\QueueingBundle\Helper\BaseCommand; | ||
use Symfony\Component\DependencyInjection\Exception\ServiceNotFoundException; | ||
|
||
/** | ||
* Sends to a queue a message to execute a symfony console command | ||
*/ | ||
class QueueConsoleCommandCommand extends BaseCommand | ||
{ | ||
/// @var \Symfony\Component\Console\Output\OutputInterface $output | ||
protected $output; | ||
|
||
protected function configure() | ||
{ | ||
$this | ||
->setName( 'kaliop_queueing:queuecommand' ) | ||
->setDescription( "Sends to a queue a message to execute a symfony console command" ) | ||
->addArgument( 'queue_name', InputArgument::REQUIRED, 'The queue name (string)' ) | ||
->addArgument( 'console_command', InputArgument::REQUIRED, 'The console command to execute (string)' ) | ||
->addArgument( 'argument/option', InputArgument::IS_ARRAY, 'Arguments and options for the executed command. Options use the syntax: option.<opt>.<val>' ) | ||
->addOption( 'ttl', 't', InputOption::VALUE_OPTIONAL, 'Validity of message (in seconds)', null ) | ||
->addOption( 'novalidate', null, InputOption::VALUE_NONE, 'Skip checking if the command is registered with the sf console' ) | ||
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging' ) | ||
; | ||
} | ||
|
||
/** | ||
* @param InputInterface $input | ||
* @param OutputInterface $output | ||
* @return void | ||
* @throws \InvalidArgumentException | ||
*/ | ||
protected function execute( InputInterface $input, OutputInterface $output ) | ||
{ | ||
$this->setOutput( $output ); | ||
|
||
$command = $input->getArgument( 'console_command' ); | ||
if( !$input->getOption( 'novalidate' ) ) | ||
{ | ||
if( !in_array( $command, array_keys( $this->getApplication()->all() ) ) ) | ||
{ | ||
throw new \InvalidArgumentException( "Command '$command' is not registered in the symfony console" ); | ||
} | ||
} | ||
|
||
if ( defined( 'AMQP_DEBUG' ) === false ) | ||
{ | ||
define( 'AMQP_DEBUG', (bool)$input->getOption( 'debug' ) ); | ||
} | ||
|
||
$queue = $input->getArgument( 'queue_name' ); | ||
$arguments = $input->getArgument( 'argument/option' ); | ||
// parse arguments to tell options apart | ||
$options = array(); | ||
foreach( $arguments as $key => $arg ) | ||
{ | ||
if ( strpos( $arg, 'option.' ) === 0 ) | ||
{ | ||
$arg = explode( '.', $arg, 3 ); | ||
$options[$arg[1]] = ( ( count( $arg ) == 3 ) ? $arg[2] : null ); | ||
unset( $arguments[$key] ); | ||
} | ||
} | ||
|
||
$messageProducer = $this->getContainer()->get( 'kaliop_queueing.message_producer.console_command.service' ); | ||
$messageProducer->setQueueName( $queue ); | ||
try | ||
{ | ||
$messageProducer->publish( | ||
$command, | ||
$arguments, | ||
$options, | ||
$ttl = $input->getOption( 'ttl' ) | ||
); | ||
|
||
$this->writeln( "Command queued for execution" . ( $ttl ? ", will be valid for $ttl seconds" : '' ) ); | ||
} | ||
catch( ServiceNotFoundException $e ) | ||
{ | ||
throw new \InvalidArgumentException( "Queue '$queue' is not registered" ); | ||
} | ||
|
||
} | ||
|
||
} |
Oops, something went wrong.