From 02a7f8d09566a7e6084d1131b37ad8ef02aec407 Mon Sep 17 00:00:00 2001 From: Marcos Date: Sun, 24 Sep 2017 10:23:33 +0200 Subject: [PATCH 1/3] Add CLI consumer interface --- Command/ConsumeTopicCommand.php | 108 ++++++++++++++++++++ DependencyInjection/Configuration.php | 1 + DependencyInjection/M6WebKafkaExtension.php | 16 ++- Handler/MessageHandlerInterface.php | 17 +++ README.md | 40 ++++++++ 5 files changed, 177 insertions(+), 5 deletions(-) create mode 100644 Command/ConsumeTopicCommand.php create mode 100644 Handler/MessageHandlerInterface.php diff --git a/Command/ConsumeTopicCommand.php b/Command/ConsumeTopicCommand.php new file mode 100644 index 0000000..fdc07ec --- /dev/null +++ b/Command/ConsumeTopicCommand.php @@ -0,0 +1,108 @@ +setName('kafka:consume') + ->setDescription('Consume command to process kafka topic/s') + ->addOption('consumer', null,InputOption::VALUE_REQUIRED, 'Consumer name') + ->addOption('handler', null,InputOption::VALUE_REQUIRED, 'Handler service name') + ->addOption('auto-commit', null,InputOption::VALUE_NONE, 'Auto commit enabled?') + ; + } + + protected function execute(InputInterface $input, OutputInterface $output) + { + $prefixName = $this->getContainer()->getParameter('m6web_kafka.prefix_name'); + + $consumer = $input->getOption('consumer'); + $handler = $input->getOption('handler'); + $autoCommit = $input->getOption('auto-commit'); + + if (!$consumer || !$handler) { + throw new \Exception('Consumer and handler options are required'); + } + + /** + * @var ConsumerManager $topicConsumer + */ + $topicConsumer = $this->getContainer()->get(sprintf('%s.consumer.%s', $prefixName, $consumer)); + if (!$topicConsumer) { + throw new \Exception(sprintf("TopicConsumer with name '%s' is not defined", $consumer)); + } + + /** + * @var MessageHandlerInterface $messageHandler + */ + $messageHandler = $this->getContainer()->get($handler); + if (!$messageHandler) { + throw new \Exception(sprintf("Message Handler with name '%s' is not defined", $handler)); + } + + $output->writeln('Waiting for partition assignment... (make take some time when quickly re-joining the group after leaving it.)'.PHP_EOL.''); + + $this->registerSigHandlers(); + + while (true) { + $message = $topicConsumer->consume($autoCommit); + + switch ($message->err) { + case RD_KAFKA_RESP_ERR_NO_ERROR: + $messageHandler->process($message); + break; + case RD_KAFKA_RESP_ERR__PARTITION_EOF: + $output->writeln('No more messages; will wait for more'); + break; + case RD_KAFKA_RESP_ERR__TIMED_OUT: + $output->writeln('Timed out'); + break; + default: + throw new \Exception($message->errstr(), $message->err); + break; + } + + if($this->shutdown) { + $output->writeln('Shuting down...'); + if ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) { + $topicConsumer->commit(); + } + + break; + } + } + + $output->writeln('End consuming topic gracefully'); + } + + private function registerSigHandlers() + { + if(!function_exists('pcntl_signal')) { + return; + } + + pcntl_signal(SIGTERM, [$this, 'shutdownFn']); + pcntl_signal(SIGINT, [$this, 'shutdownFn']); + pcntl_signal(SIGQUIT, [$this, 'shutdownFn']); + } + + public function shutdownFn() + { + $this->shutdown = true; + } + +} diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index a80bb70..c9d66f1 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -20,6 +20,7 @@ public function getConfigTreeBuilder() $rootNode ->children() + ->scalarNode('prefix_services_name')->defaultValue('m6_web_kafka')->end() ->booleanNode('event_dispatcher')->defaultTrue()->end() ->arrayNode('consumers') ->useAttributeAsKey('key') diff --git a/DependencyInjection/M6WebKafkaExtension.php b/DependencyInjection/M6WebKafkaExtension.php index 6946739..7b5df70 100644 --- a/DependencyInjection/M6WebKafkaExtension.php +++ b/DependencyInjection/M6WebKafkaExtension.php @@ -20,6 +20,7 @@ class M6WebKafkaExtension extends Extension { /** * {@inheritDoc} + * @throws \Symfony\Component\DependencyInjection\Exception\BadMethodCallException */ public function load(array $configs, ContainerBuilder $container) { @@ -31,11 +32,14 @@ public function load(array $configs, ContainerBuilder $container) $this->loadProducers($container, $config); $this->loadConsumers($container, $config); + + $container->setParameter('m6web_kafka.prefix_name', $config['prefix_services_name']); } /** * @param ContainerBuilder $container - * @param array $config + * @param array $config + * @throws \Symfony\Component\DependencyInjection\Exception\BadMethodCallException */ protected function loadProducers(ContainerBuilder $container, array $config) { @@ -60,7 +64,7 @@ protected function loadProducers(ContainerBuilder $container, array $config) $this->setEventDispatcher($config, $producerDefinition); $container->setDefinition( - sprintf('m6_web_kafka.producer.%s', $key), + sprintf('%s.producer.%s', $config['prefix_services_name'], $key), $producerDefinition ); } @@ -68,7 +72,8 @@ protected function loadProducers(ContainerBuilder $container, array $config) /** * @param ContainerBuilder $container - * @param array $config + * @param array $config + * @throws \Symfony\Component\DependencyInjection\Exception\BadMethodCallException */ protected function loadConsumers(ContainerBuilder $container, array $config) { @@ -91,15 +96,16 @@ protected function loadConsumers(ContainerBuilder $container, array $config) $this->setEventDispatcher($config, $consumerDefinition); $container->setDefinition( - sprintf('m6_web_kafka.consumer.%s', $key), + sprintf('%s.consumer.%s', $config['prefix_services_name'], $key), $consumerDefinition ); } } /** - * @param array $config + * @param array $config * @param Definition $definition + * @throws \Symfony\Component\DependencyInjection\Exception\InvalidArgumentException */ protected function setEventDispatcher(array $config, Definition $definition) { diff --git a/Handler/MessageHandlerInterface.php b/Handler/MessageHandlerInterface.php new file mode 100644 index 0000000..49a2e43 --- /dev/null +++ b/Handler/MessageHandlerInterface.php @@ -0,0 +1,17 @@ +consume(false); ``` +*Note:* Setting $consumer->consume(false) boost consuming performance, avoiding to commit every single kafka consumed message. + You can decide to commit manually your message with: ```php $consumer->commit(); @@ -178,6 +181,43 @@ It is the `\RdKafka\Message` from the [RdKafka extension](https://arnaud-lb.gith In case there is no more message, it will give you a _No more message_ string. In case there is a time out, it will give you a _Time out_ string. +#### Consuming from CLI + +You can use your own service to process Kakfa topic messages and run it from Symfony special command provided by us. + +At first you have to create your own service implementing MessageHandlerInterface. For example: + +```php +payload. PHP_EOL; + } + +} +``` + +Then you could start consuming messages executing this command: + +``` +php bin/console kafka:consume --consumer you_consumer_name --handler "AppBundle\Service\TestConsumer" +``` + +Also you can add --auto-commit option to enable auto commit in every consumed message. + +*Note:* If you aren't using *Symfony Automatic Service Loading*, introduced in Symfony 3.3, you have to put in handler option your service name instead of FQCN. + +This Symfony command provide *signal management* too, handling SIGQUIT, SIGTERM and SIGINT to do a gracefully shutdown, committing last processed message and exiting. + ### Exceptions list - EntityNotSetException - KafkaException From 5f9a4a8785f47576be33875d6b912a4642aea663 Mon Sep 17 00:00:00 2001 From: Marcos Date: Sun, 24 Sep 2017 20:11:58 +0200 Subject: [PATCH 2/3] Changes due to comments --- Command/ConsumeTopicCommand.php | 26 ++++++++++----------- DependencyInjection/Configuration.php | 2 +- DependencyInjection/M6WebKafkaExtension.php | 7 +++--- README.md | 6 ++--- 4 files changed, 19 insertions(+), 22 deletions(-) diff --git a/Command/ConsumeTopicCommand.php b/Command/ConsumeTopicCommand.php index fdc07ec..4f2bd3b 100644 --- a/Command/ConsumeTopicCommand.php +++ b/Command/ConsumeTopicCommand.php @@ -7,6 +7,7 @@ use M6Web\Bundle\KafkaBundle\Manager\ConsumerManager; use M6Web\Bundle\KafkaBundle\Handler\MessageHandlerInterface; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; +use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; @@ -18,30 +19,27 @@ class ConsumeTopicCommand extends ContainerAwareCommand protected function configure() { $this - ->setName('kafka:consume') - ->setDescription('Consume command to process kafka topic/s') - ->addOption('consumer', null,InputOption::VALUE_REQUIRED, 'Consumer name') - ->addOption('handler', null,InputOption::VALUE_REQUIRED, 'Handler service name') - ->addOption('auto-commit', null,InputOption::VALUE_NONE, 'Auto commit enabled?') + ->setName('m6web:kafka:consume') + ->setDescription('Consume command to process kafka topics') + ->addArgument('consumer', InputArgument::REQUIRED, 'Consumer name') + ->addArgument('handler', InputArgument::REQUIRED, 'Handler service name') + ->addOption('auto-commit', null, InputOption::VALUE_NONE, 'Auto commit enabled?') ; } protected function execute(InputInterface $input, OutputInterface $output) { - $prefixName = $this->getContainer()->getParameter('m6web_kafka.prefix_name'); + $container = $this->getContainer(); + $prefixName = $container->getParameter('m6web_kafka.services_name_prefix'); - $consumer = $input->getOption('consumer'); - $handler = $input->getOption('handler'); + $consumer = $input->getArgument('consumer'); + $handler = $input->getArgument('handler'); $autoCommit = $input->getOption('auto-commit'); - if (!$consumer || !$handler) { - throw new \Exception('Consumer and handler options are required'); - } - /** * @var ConsumerManager $topicConsumer */ - $topicConsumer = $this->getContainer()->get(sprintf('%s.consumer.%s', $prefixName, $consumer)); + $topicConsumer = $container->get(sprintf('%s.consumer.%s', $prefixName, $consumer)); if (!$topicConsumer) { throw new \Exception(sprintf("TopicConsumer with name '%s' is not defined", $consumer)); } @@ -49,7 +47,7 @@ protected function execute(InputInterface $input, OutputInterface $output) /** * @var MessageHandlerInterface $messageHandler */ - $messageHandler = $this->getContainer()->get($handler); + $messageHandler = $container->get($handler); if (!$messageHandler) { throw new \Exception(sprintf("Message Handler with name '%s' is not defined", $handler)); } diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index c9d66f1..ee0d696 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -20,7 +20,7 @@ public function getConfigTreeBuilder() $rootNode ->children() - ->scalarNode('prefix_services_name')->defaultValue('m6_web_kafka')->end() + ->scalarNode('services_name_prefix')->defaultValue('m6_web_kafka')->end() ->booleanNode('event_dispatcher')->defaultTrue()->end() ->arrayNode('consumers') ->useAttributeAsKey('key') diff --git a/DependencyInjection/M6WebKafkaExtension.php b/DependencyInjection/M6WebKafkaExtension.php index 7b5df70..f827368 100644 --- a/DependencyInjection/M6WebKafkaExtension.php +++ b/DependencyInjection/M6WebKafkaExtension.php @@ -33,7 +33,7 @@ public function load(array $configs, ContainerBuilder $container) $this->loadProducers($container, $config); $this->loadConsumers($container, $config); - $container->setParameter('m6web_kafka.prefix_name', $config['prefix_services_name']); + $container->setParameter('m6web_kafka.services_name_prefix', $config['services_name_prefix']); } /** @@ -64,7 +64,7 @@ protected function loadProducers(ContainerBuilder $container, array $config) $this->setEventDispatcher($config, $producerDefinition); $container->setDefinition( - sprintf('%s.producer.%s', $config['prefix_services_name'], $key), + sprintf('%s.producer.%s', $config['services_name_prefix'], $key), $producerDefinition ); } @@ -73,7 +73,6 @@ protected function loadProducers(ContainerBuilder $container, array $config) /** * @param ContainerBuilder $container * @param array $config - * @throws \Symfony\Component\DependencyInjection\Exception\BadMethodCallException */ protected function loadConsumers(ContainerBuilder $container, array $config) { @@ -96,7 +95,7 @@ protected function loadConsumers(ContainerBuilder $container, array $config) $this->setEventDispatcher($config, $consumerDefinition); $container->setDefinition( - sprintf('%s.consumer.%s', $config['prefix_services_name'], $key), + sprintf('%s.consumer.%s', $config['services_name_prefix'], $key), $consumerDefinition ); } diff --git a/README.md b/README.md index 26cc096..41233d3 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ Here a configuration example: ```yaml m6_web_kafka: - prefix_services_name: 'custom_prefix' + services_name_prefix: 'custom_prefix' event_dispatcher: true producers: producer1: @@ -206,10 +206,10 @@ class TestConsumer implements MessageHandlerInterface } ``` -Then you could start consuming messages executing this command: +Then you could start consuming messages executing this command with your consumer and service name: ``` -php bin/console kafka:consume --consumer you_consumer_name --handler "AppBundle\Service\TestConsumer" +php bin/console kafka:consume you_consumer_name "AppBundle\Service\TestConsumer" ``` Also you can add --auto-commit option to enable auto commit in every consumed message. From aa673af023ccb17070fb0beecc1818444ca684b0 Mon Sep 17 00:00:00 2001 From: Marcos Date: Tue, 26 Sep 2017 00:34:57 +0200 Subject: [PATCH 3/3] Add memory handler and abstract implementation --- Command/ConsumeTopicCommand.php | 30 +++++++++++++++++------------ Handler/MessageHandlerAbstract.php | 13 +++++++++++++ Handler/MessageHandlerInterface.php | 5 +++++ 3 files changed, 36 insertions(+), 12 deletions(-) create mode 100644 Handler/MessageHandlerAbstract.php diff --git a/Command/ConsumeTopicCommand.php b/Command/ConsumeTopicCommand.php index 4f2bd3b..b4bbe55 100644 --- a/Command/ConsumeTopicCommand.php +++ b/Command/ConsumeTopicCommand.php @@ -1,6 +1,6 @@ addArgument('consumer', InputArgument::REQUIRED, 'Consumer name') ->addArgument('handler', InputArgument::REQUIRED, 'Handler service name') ->addOption('auto-commit', null, InputOption::VALUE_NONE, 'Auto commit enabled?') - ; + ->addOption('memory-max', null, InputOption::VALUE_REQUIRED, 'Memory max in bytes'); } protected function execute(InputInterface $input, OutputInterface $output) @@ -35,6 +35,7 @@ protected function execute(InputInterface $input, OutputInterface $output) $consumer = $input->getArgument('consumer'); $handler = $input->getArgument('handler'); $autoCommit = $input->getOption('auto-commit'); + $memoryMax = $input->getOption('memory-max'); /** * @var ConsumerManager $topicConsumer @@ -52,7 +53,7 @@ protected function execute(InputInterface $input, OutputInterface $output) throw new \Exception(sprintf("Message Handler with name '%s' is not defined", $handler)); } - $output->writeln('Waiting for partition assignment... (make take some time when quickly re-joining the group after leaving it.)'.PHP_EOL.''); + $output->writeln('Waiting for partition assignment... (make take some time when quickly re-joining the group after leaving it.)' . PHP_EOL . ''); $this->registerSigHandlers(); @@ -65,6 +66,7 @@ protected function execute(InputInterface $input, OutputInterface $output) break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: $output->writeln('No more messages; will wait for more'); + $messageHandler->endOfPartitionReached(); break; case RD_KAFKA_RESP_ERR__TIMED_OUT: $output->writeln('Timed out'); @@ -74,7 +76,12 @@ protected function execute(InputInterface $input, OutputInterface $output) break; } - if($this->shutdown) { + if ($memoryMax !== null && memory_get_peak_usage(true) >= $memoryMax) { + $output->writeln('Memory limit exceeded!'); + $this->shutdownFn(); + } + + if ($this->shutdown) { $output->writeln('Shuting down...'); if ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) { $topicConsumer->commit(); @@ -84,12 +91,17 @@ protected function execute(InputInterface $input, OutputInterface $output) } } - $output->writeln('End consuming topic gracefully'); + $output->writeln('End consuming topic successfully'); + } + + public function shutdownFn() + { + $this->shutdown = true; } private function registerSigHandlers() { - if(!function_exists('pcntl_signal')) { + if (!function_exists('pcntl_signal')) { return; } @@ -97,10 +109,4 @@ private function registerSigHandlers() pcntl_signal(SIGINT, [$this, 'shutdownFn']); pcntl_signal(SIGQUIT, [$this, 'shutdownFn']); } - - public function shutdownFn() - { - $this->shutdown = true; - } - } diff --git a/Handler/MessageHandlerAbstract.php b/Handler/MessageHandlerAbstract.php new file mode 100644 index 0000000..d61c82e --- /dev/null +++ b/Handler/MessageHandlerAbstract.php @@ -0,0 +1,13 @@ +