Skip to content
This repository has been archived by the owner on Sep 23, 2021. It is now read-only.

Add CLI consumer interface #14

Closed
wants to merge 3 commits into from

Conversation

markitosgv
Copy link

Add Symfony command to consume kafka messages, handling signals and providing an interface to implement your own services to consume.

Copy link
Member

@Oliboy50 Oliboy50 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this feature :)

protected function configure()
{
$this
->setName('kafka:consume')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should add a namespace for the command name, as it was done here => https://github.com/M6Web/PhpProcessManagerBundle/blob/master/Command/HttpProcessCommand.php#L62

so we could name it m6web:kafka:consume, what do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok, I'm going to add bundle namespace

{
$this
->setName('kafka:consume')
->setDescription('Consume command to process kafka topic/s')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the / before s?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

->setDescription('Consume command to process kafka topic/s')
->addOption('consumer', null,InputOption::VALUE_REQUIRED, 'Consumer name')
->addOption('handler', null,InputOption::VALUE_REQUIRED, 'Handler service name')
->addOption('auto-commit', null,InputOption::VALUE_NONE, 'Auto commit enabled?')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a space after null,

(we should set a php-cs-fixer check in our Travis builds 😱)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, you could maybe use https://styleci.io/


protected function execute(InputInterface $input, OutputInterface $output)
{
$prefixName = $this->getContainer()->getParameter('m6web_kafka.prefix_name');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

register $container = $this->getContainer(); before this line, because this getter is used several times, and we don't know how much time consuming it is compared to a simple variable assignment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

throw new \Exception(sprintf("Message Handler with name '%s' is not defined", $handler));
}

$output->writeln('<comment>Waiting for partition assignment... (make take some time when quickly re-joining the group after leaving it.)'.PHP_EOL.'</comment>');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(may take ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to add PHP_EOL since it's already done by writeln method (except if you really want a double EOL)

source: http://api.symfony.com/3.3/Symfony/Component/Console/Output/OutputInterface.html#method_writeln

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to maintain a double EOL

$output->writeln('<question>No more messages; will wait for more</question>');
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
$output->writeln('<question>Timed out</question>');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<error>?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not an error, this appears every time you don't have any messages to consume every X seconds defined in timeout_consuming_queue

$messageHandler->process($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
$output->writeln('<question>No more messages; will wait for more</question>');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<info>No more messages. Waiting for more...</info>?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this, it's a message about they aren't any messages left

}

if($this->shutdown) {
$output->writeln('<question>Shuting down...</question>');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<info>?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's about color, I don't care if you would prefer to put info :D

@@ -31,11 +32,14 @@ public function load(array $configs, ContainerBuilder $container)

$this->loadProducers($container, $config);
$this->loadConsumers($container, $config);

$container->setParameter('m6web_kafka.prefix_name', $config['prefix_services_name']);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not to sure about this prefix parameter... cc @NastasiaSaby what do you think?

anyway, if this feature is accepted:

I'd name it services_name_prefix, and you should keep the exact same name for the internal parameter name, so:

$container->setParameter('m6web_kafka.services_name_prefix', $config['services_name_prefix']);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prefix is needed to access here:

$prefixName = $this->getContainer()->getParameter('m6web_kafka.prefix_name');

}

/**
* @param ContainerBuilder $container
* @param array $config
* @param array $config
* @throws \Symfony\Component\DependencyInjection\Exception\BadMethodCallException
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this doc tag is not helpful if there's no throw statement in the method body, except if you explain in which case this exception is thrown (eg. @throws \MyException If a wrong parameter is used)

Copy link
Contributor

@NastasiaSaby NastasiaSaby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @markitosgv for your contribution :)

}
}

$output->writeln('<info>End consuming topic gracefully</info>');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

successfully?

*/
public function process(Message $message);

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

End of line


*Note:* If you aren't using *Symfony Automatic Service Loading*, introduced in Symfony 3.3, you have to put in handler option your service name instead of FQCN.

This Symfony command provide *signal management* too, handling SIGQUIT, SIGTERM and SIGINT to do a gracefully shutdown, committing last processed message and exiting.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provides

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

successfull shutdown?

@@ -53,6 +53,7 @@ Here a configuration example:

```yaml
m6_web_kafka:
services_name_prefix: 'custom_prefix'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we can add a comment to say that we have the possibility to customize the service name by using our own prefix with an optional parameter.

@omansour
Copy link

👎

sorry @markitosgv but this will not be merged. We prefer decouple responsibilities and use the https://github.com/M6Web/DaemonBundle to build commands like this and keep this current bundle simple as possible.

@omansour
Copy link

after consulting the maintainers, this will not be merged. Please use the DaemonBundle as mentioned previously.

@omansour omansour closed this Sep 26, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants