Skip to content
This repository has been archived by the owner on Sep 23, 2021. It is now read-only.

Add CLI consumer interface #14

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions Command/ConsumeTopicCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?php

declare(ticks = 1);

namespace M6Web\Bundle\KafkaBundle\Command;

use M6Web\Bundle\KafkaBundle\Manager\ConsumerManager;
use M6Web\Bundle\KafkaBundle\Handler\MessageHandlerInterface;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class ConsumeTopicCommand extends ContainerAwareCommand
{
protected $shutdown;

protected function configure()
{
$this
->setName('m6web:kafka:consume')
->setDescription('Consume command to process kafka topics')
->addArgument('consumer', InputArgument::REQUIRED, 'Consumer name')
->addArgument('handler', InputArgument::REQUIRED, 'Handler service name')
->addOption('auto-commit', null, InputOption::VALUE_NONE, 'Auto commit enabled?')
;
}

protected function execute(InputInterface $input, OutputInterface $output)
{
$container = $this->getContainer();
$prefixName = $container->getParameter('m6web_kafka.services_name_prefix');

$consumer = $input->getArgument('consumer');
$handler = $input->getArgument('handler');
$autoCommit = $input->getOption('auto-commit');

/**
* @var ConsumerManager $topicConsumer
*/
$topicConsumer = $container->get(sprintf('%s.consumer.%s', $prefixName, $consumer));
if (!$topicConsumer) {
throw new \Exception(sprintf("TopicConsumer with name '%s' is not defined", $consumer));
}

/**
* @var MessageHandlerInterface $messageHandler
*/
$messageHandler = $container->get($handler);
if (!$messageHandler) {
throw new \Exception(sprintf("Message Handler with name '%s' is not defined", $handler));
}

$output->writeln('<comment>Waiting for partition assignment... (make take some time when quickly re-joining the group after leaving it.)'.PHP_EOL.'</comment>');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(may take ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to add PHP_EOL since it's already done by writeln method (except if you really want a double EOL)

source: http://api.symfony.com/3.3/Symfony/Component/Console/Output/OutputInterface.html#method_writeln

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to maintain a double EOL


$this->registerSigHandlers();

while (true) {
$message = $topicConsumer->consume($autoCommit);

switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$messageHandler->process($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
$output->writeln('<question>No more messages; will wait for more</question>');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<info>No more messages. Waiting for more...</info>?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this, it's a message about they aren't any messages left

break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
$output->writeln('<question>Timed out</question>');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<error>?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not an error, this appears every time you don't have any messages to consume every X seconds defined in timeout_consuming_queue

break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}

if($this->shutdown) {
$output->writeln('<question>Shuting down...</question>');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<info>?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's about color, I don't care if you would prefer to put info :D

if ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) {
$topicConsumer->commit();
}

break;
}
}

$output->writeln('<info>End consuming topic gracefully</info>');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

successfully?

}

private function registerSigHandlers()
{
if(!function_exists('pcntl_signal')) {
return;
}

pcntl_signal(SIGTERM, [$this, 'shutdownFn']);
pcntl_signal(SIGINT, [$this, 'shutdownFn']);
pcntl_signal(SIGQUIT, [$this, 'shutdownFn']);
}

public function shutdownFn()
{
$this->shutdown = true;
}

}
1 change: 1 addition & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public function getConfigTreeBuilder()

$rootNode
->children()
->scalarNode('services_name_prefix')->defaultValue('m6_web_kafka')->end()
->booleanNode('event_dispatcher')->defaultTrue()->end()
->arrayNode('consumers')
->useAttributeAsKey('key')
Expand Down
15 changes: 10 additions & 5 deletions DependencyInjection/M6WebKafkaExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class M6WebKafkaExtension extends Extension
{
/**
* {@inheritDoc}
* @throws \Symfony\Component\DependencyInjection\Exception\BadMethodCallException
*/
public function load(array $configs, ContainerBuilder $container)
{
Expand All @@ -31,11 +32,14 @@ public function load(array $configs, ContainerBuilder $container)

$this->loadProducers($container, $config);
$this->loadConsumers($container, $config);

$container->setParameter('m6web_kafka.services_name_prefix', $config['services_name_prefix']);
}

/**
* @param ContainerBuilder $container
* @param array $config
* @param array $config
* @throws \Symfony\Component\DependencyInjection\Exception\BadMethodCallException
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this doc tag is not helpful if there's no throw statement in the method body, except if you explain in which case this exception is thrown (eg. @throws \MyException If a wrong parameter is used)

*/
protected function loadProducers(ContainerBuilder $container, array $config)
{
Expand All @@ -60,15 +64,15 @@ protected function loadProducers(ContainerBuilder $container, array $config)
$this->setEventDispatcher($config, $producerDefinition);

$container->setDefinition(
sprintf('m6_web_kafka.producer.%s', $key),
sprintf('%s.producer.%s', $config['services_name_prefix'], $key),
$producerDefinition
);
}
}

/**
* @param ContainerBuilder $container
* @param array $config
* @param array $config
*/
protected function loadConsumers(ContainerBuilder $container, array $config)
{
Expand All @@ -91,15 +95,16 @@ protected function loadConsumers(ContainerBuilder $container, array $config)
$this->setEventDispatcher($config, $consumerDefinition);

$container->setDefinition(
sprintf('m6_web_kafka.consumer.%s', $key),
sprintf('%s.consumer.%s', $config['services_name_prefix'], $key),
$consumerDefinition
);
}
}

/**
* @param array $config
* @param array $config
* @param Definition $definition
* @throws \Symfony\Component\DependencyInjection\Exception\InvalidArgumentException
*/
protected function setEventDispatcher(array $config, Definition $definition)
{
Expand Down
17 changes: 17 additions & 0 deletions Handler/MessageHandlerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace M6Web\Bundle\KafkaBundle\Handler;

use RdKafka\Message;

interface MessageHandlerInterface
{
/**
* Process message from kafka
*
* @param Message $message
* @return mixed
*/
public function process(Message $message);

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

End of line

40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Here a configuration example:

```yaml
m6_web_kafka:
services_name_prefix: 'custom_prefix'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we can add a comment to say that we have the possibility to customize the service name by using our own prefix with an optional parameter.

event_dispatcher: true
producers:
producer1:
Expand Down Expand Up @@ -165,6 +166,8 @@ But you can choose not to do it by adding an argument as following:
$consumer->consume(false);
```

*Note:* Setting $consumer->consume(false) boost consuming performance, avoiding to commit every single kafka consumed message.

You can decide to commit manually your message with:
```php
$consumer->commit();
Expand All @@ -178,6 +181,43 @@ It is the `\RdKafka\Message` from the [RdKafka extension](https://arnaud-lb.gith
In case there is no more message, it will give you a _No more message_ string.
In case there is a time out, it will give you a _Time out_ string.

#### Consuming from CLI

You can use your own service to process Kakfa topic messages and run it from Symfony special command provided by us.

At first you have to create your own service implementing MessageHandlerInterface. For example:

```php
<?php

namespace AppBundle\Service;

use M6Web\Bundle\KafkaBundle\Handler\MessageHandlerInterface;
use RdKafka\Message;

class TestConsumer implements MessageHandlerInterface
{

public function process(Message $message)
{
echo $message->payload. PHP_EOL;
}

}
```

Then you could start consuming messages executing this command with your consumer and service name:

```
php bin/console kafka:consume you_consumer_name "AppBundle\Service\TestConsumer"
```

Also you can add --auto-commit option to enable auto commit in every consumed message.

*Note:* If you aren't using *Symfony Automatic Service Loading*, introduced in Symfony 3.3, you have to put in handler option your service name instead of FQCN.

This Symfony command provide *signal management* too, handling SIGQUIT, SIGTERM and SIGINT to do a gracefully shutdown, committing last processed message and exiting.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provides

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

successfull shutdown?


### Exceptions list
- EntityNotSetException
- KafkaException
Expand Down