The config file holds all information about brokers, topics, consumer groups and middlewares.
To quickly start using, we can focus in two sections:
-
Brokers
An array of brokers, with connection and authentication configurations:
-
connections
: required. can be astring
with multiple connections separated by comma or anarray
of connections (asstring
) -
auth
: optional. out of the box, the package can connect with SSL Authentication only or without any authentication
'brokers' => [ 'price_brokers' => [ 'connections' => 'localhost:8091,localhost:8092', 'auth' => [ 'type' => 'ssl', 'ca' => storage_path('ca.pem'), 'certificate' => storage_path('kafka.cert'), 'key' => storage_path('kafka.key'), ], ], 'stock_brokers' => [ 'connections' => ['localhost:8091', 'localhost:8092'], 'auth' => [], // can be an empty array or even don't have this key in the broker config ], ],
-
-
Topics
An array of topics configuration, such as the topic name, which broker connection should use, consumer groups and middlewares.
Here we can specify the group consumers, each topic can have multiple groups, and each group holds the configuration for which consumer, offset_reset (for setting initial offset) and middleware it must use.
'topics' => [ 'price_update' => [ 'topic' => 'products.price.update', 'broker' => 'price_brokers', 'consumer_groups' => [ 'default' => [ 'offset_reset' => 'smallest', 'handler' => '\App\Kafka\Consumers\PriceUpdateConsumer', ], ], ], ],
After setting up the required configs, you need to create the consumer, which will handle all records received from the topic specified in the config.
Creating the consumer is easy as running the following command:
$ php artisan make:kafka-consumer PriceUpdateConsumer
This will create a KafkaConsumer class inside the application, on the app/Kafka/Consumers/ directory
There, you'll have a handler method, which will send all records from the topic to the Consumer, also, methods will be available for handling exceptions
use App\Kafka\Consumers\PriceUpdateConsumer;
use Metamorphosis\TopicHandler\Consumer\AbstractHandler;
use Metamorphosis\Record\RecordInterface;
class PriceUpdateConsumer extends AbstractHandler
{
public $repository;
/**
* Create a new consumer topic handler instance.
*/
public function __construct(Repository $repository)
{
$this->repository = $repository;
}
/**
* Handle payload.
*/
public function handle(RecordInterface $record): void
{
$product = $record->getPayload();
$this->repository->update($product['id'], $product['price']);
}
}
Now you just need to start consuming the topic.
The simplest way to see it working is by running the kafka:consume command along with the topic name declared in the topics config key:
$ php artisan kafka:consume price-update
This command will run in a while true
, that means, it will never stop running.
But, errors can happen, so we strongly advice you to run this command along with supervisor,
like this example below:
[program:kafka-consumer-price-update]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/default/artisan kafka:consume price-update --timeout=-1
autostart=true
autorestart=true
user=root
numprocs=6
redirect_stderr=true
stdout_logfile=/var/log/default/kafka-consumer-price-update.log
That's it. For more information about usage, middlewares, broker authentication, consumer groups and other advanced topics, please have a look at our Advanced Usage Guide.
Producer also required configs, which will produce all records using parameters specified in the config.
'brokers' => [
'local-dev' => [
'connections' => 'kafka:9092',
],
],
'topics' => [
'product-updated' => [
'topic_id' => 'product_updated',
'broker' => 'local-dev',
],
],
Creating Producer handler.
The Producer must extends AbstractHandler class and can be empty.
<?php
use Metamorphosis\TopicHandler\Producer\AbstractHandler;
class ProductUpdated extends AbstractHandler
{
}
Creating payload and produce kafka message.
The payload must be a array. This array can even store other arrays as values.
The second parameter indicates which kafka topic will receive the message and the third indicates the message key.
$record = ['name' => 'test', 'id' => 88989898, 'price' => 18.99];
$key = 88989898;
$producer = new ProductUpdated($record, 'product-updated', $key)
Metamorphosis::produce($producer);