Add as Composer dependency:
$ composer require simpod/kafka-bundle
Then add KafkaBundle
to Symfony's bundles.php
:
use SimPod\KafkaBundle\SimPodKafkaBundle;
return [
...
new SimPodKafkaBundle()
...
];
This package simply makes it easier to integrate https://github.com/arnaud-lb/php-rdkafka with Symfony. For more details how to work with Kafka in PHP, refer to its documentation.
This bundle registers these commands:
bin/console debug:kafka:consumers
to list all available consumer groupsbin/console kafka:consumer:run <consumer group name>
to run consumer instance to join specific consumer group
Create eg. kafka.yaml
file in your config directory with following content:
kafka:
broker_list: '%env(KAFKA_BROKER_LIST)%' # required
client:
id: 'your-application-name'
It reads env var KAFKA_BROKER_LIST
that contains comma-separated list of brokers (broker-1.kafka.com:9092,broker-2.kafka.com:9092
).
If not set, it defaults to 127.0.0.1:9092
To create producer, you will need only Brokers
from this bundle, there's no need for anything else.
Simple example:
<?php
declare(strict_types=1);
use RdKafka\Producer;
use SimPod\KafkaBundle\Kafka\Brokers;
use const RD_KAFKA_PARTITION_UA;
use function json_encode;
class SimpleProducer
{
private const TOPIC_NAME = 'topic1';
/** @var Brokers */
private $brokers;
public function __construct(Brokers $brokers)
{
$this->brokers = $brokers;
}
public function produce(MessageObject $message) : void
{
$producer = new Producer();
$producer->addBrokers($this->brokers->getList());
$topic = $producer->newTopic(self::TOPIC_NAME);
// 4th argument can be optional key
$topic->produce(
RD_KAFKA_PARTITION_UA,
0,
json_encode($message)
);
}
}
This is example of simple consumer that belongs into simple_consumer_group
and consuming topic1
<?php
declare(strict_types=1);
use RdKafka\KafkaConsumer;
use RdKafka\Message;
use SimPod\KafkaBundle\Kafka\Consumer\Consumer;
use SimPod\KafkaBundle\Kafka\Consumer\Config;
final class SimpleConsumer implements Consumer
{
private const GROUP_ID = 'simple_consumer_group';
public function consume(Message $kafkaMessage, KafkaConsumer $kafkaConsumer) : void
{
// Execute your consumer logic here
}
public function getGroupId() : string
{
return self::GROUP_ID;
}
/**
* @return string[]
*/
public function getTopics() : array
{
return ['topic1'];
}
public function getConfig() : Config
{
return new Config($this->getGroupId());
}
}
It is automatically registered to container for it implements Consumer
There is kwn/php-rdkafka-stubs
listed as a dev dependency so it properly integrates php-rdkafka extension with IDE.