C PHP C++ M4 Other
Clone or download
arnaud-lb Merge pull request #155 from tPl0ch/feature-additional-callbacks
Fix #151 - Callbacks for `offset_commit` and `consume`
Latest commit b4963e5 Mar 19, 2018
Permalink
Failed to load latest commit information.
.github Update ISSUE_TEMPLATE Nov 30, 2016
examples Add examples/README.md Jul 27, 2016
tests Add `Conf::setStatsCb` integration test Mar 12, 2018
.gitignore Implement new KafkaConsumer Feb 18, 2016
.travis.yml Don't test against 5.3 Oct 31, 2017
CONTRIBUTING.md Update CONTRIBUTING.md Jul 30, 2017
CREDITS Common code for php 5 and 7 Dec 18, 2016
LICENSE rdkafka Mar 30, 2015
README.md add language hint for syntax highlighting Aug 3, 2017
compat.c Update header Dec 18, 2016
compat.h Update header Dec 18, 2016
conf.c Move callback tests into own test for >= 0.9.0 Mar 11, 2018
conf.h Fix #151 - Callbacks for `offset_commit` and `consume` Mar 11, 2018
config.m4 Support older librdkafka versions Jan 2, 2018
config.w32 Common code for php 5 and 7 Dec 18, 2016
fun.c Update header Dec 18, 2016
fun.h Update header Dec 18, 2016
kafka_consumer.c Update kafka_consumer.c Jan 2, 2018
kafka_consumer.h Update header Dec 18, 2016
message.c CS Jan 2, 2018
message.h Update header Dec 18, 2016
metadata.c Update header Dec 18, 2016
metadata.h Update header Dec 18, 2016
metadata_broker.c Update header Dec 18, 2016
metadata_broker.h Update header Dec 18, 2016
metadata_collection.c Fix building in Windows Sep 24, 2017
metadata_collection.h Update header Dec 18, 2016
metadata_partition.c Update header Dec 18, 2016
metadata_partition.h Update header Dec 18, 2016
metadata_topic.c Update header Dec 18, 2016
metadata_topic.h Update header Dec 18, 2016
package2.xml 3.0.5 Nov 20, 2017
php_rdkafka.h Back to dev Nov 20, 2017
php_rdkafka_priv.h PHP 5 compat Oct 31, 2017
queue.c Ensure proper destroy order Oct 31, 2017
queue.h Fix object freeing order, make sure that Queue objects are freed earlier Jul 30, 2017
rdkafka.c Fix stop consuming order Oct 31, 2017
topic.c Ensure proper destroy order Oct 31, 2017
topic.h Update header Dec 18, 2016
topic_partition.c Restore error handling, fixes #111 May 11, 2017
topic_partition.h Update header Dec 18, 2016
travis.sh Don't abort the build if there is no xdebug.ini Sep 9, 2016
zeval.h Common code for php 5 and 7 Dec 18, 2016

README.md

PHP Kafka client - php-rdkafka

Join the chat at https://gitter.im/arnaud-lb/php-rdkafka

Supported Kafka versions: 0.8, 0.9, 0.10 Supported PHP versions: 5.3 .. 7.x Build Status

PHP-rdkafka is a thin librdkafka binding providing a working PHP 5 / PHP 7 Kafka 0.8 / 0.9 / 0.10 client.

It supports the high level and low level consumers, producer, and metadata APIs.

The API ressembles as much as possible to librdkafka's, and is fully documented here.

Table of Contents

  1. Installation
  2. Examples
  3. Usage
  4. Documentation
  5. Credits
  6. License

Installation

https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.setup.html

Examples

https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples.html

Usage

Producing

For producing, we first need to create a producer, and to add brokers (Kafka servers) to it:

<?php

$rk = new RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("10.0.0.1,10.0.0.2");

Next, we create a topic instance from the producer:

<?php

$topic = $rk->newTopic("test");

From there, we can produce as much messages as we want, using the produce method:

<?php

$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.

The second argument are message flags and should always be 0, currently.

The message payload can be anything.

High-level consuming

The RdKafka\KafkaConsumer class supports automatic partition assignment/revocation. See the example here.

Low-level consuming

We first need to create a low level consumer, and to add brokers (Kafka servers) to it:

<?php

$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("10.0.0.1,10.0.0.2");

Next, create a topic instance by calling the newTopic() method, and start consuming on partition 0:

<?php

$topic = $rk->newTopic("test");

// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

Next, retrieve the consumed messages:

<?php

while (true) {
    // The first argument is the partition (again).
    // The second argument is the timeout.
    $msg = $topic->consume(0, 1000);
    if ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

Low-level consuming from multiple topics / partitions

Consuming from multiple topics and/or partitions can be done by telling librdkafka to forward all messages from these topics/partitions to an internal queue, and then consuming from this queue:

Creating the queue:

<?php
$queue = $rk->newQueue();

Adding topars to the queue:

<?php

$topic1 = $rk->newTopic("topic1");
$topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);

$topic2 = $rk->newTopic("topic2");
$topic2->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);

Next, retrieve the consumed messages from the queue:

<?php

while (true) {
    // The only argument is the timeout.
    $msg = $queue->consume(1000);
    if ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

Using stored offsets

librdkafka can store offsets in a local file, or on the broker. The default is local file, and as soon as you start using RD_KAFKA_OFFSET_STORED as consuming offset, rdkafka starts to store the offset.

By default, the file is created in the current directory, with a name based on the topic and the partition. The directory can be changed by setting the offset.store.path configuration property.

Other interesting properties are: offset.store.sync.interval.ms, offset.store.method, auto.commit.interval.ms, auto.commit.enable, offset.store.method, group.id.

<?php

$topicConf = new RdKafka\TopicConf();
$topicConf->set("auto.commit.interval.ms", 1e3);
$topicConf->set("offset.store.sync.interval.ms", 60e3);

$topic = $rk->newTopic("test", $topicConf);

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

Interesting configuration parameters

queued.max.messages.kbytes

librdkafka will buffer up to 1GB of messages for each consumed partition by default. You can lower memory usage by reducing the value of the queued.max.messages.kbytes parameter on your consumers.

topic.metadata.refresh.sparse and topic.metadata.refresh.interval.ms

Each consumer and procuder instance will fetch topics metadata at an interval defined by the topic.metadata.refresh.interval.ms parameter. Depending on your librdkafka version, the parameter defaults to 10 seconds, or 600 seconds.

librdkafka fetches the metadata for all topics of the cluster by default. Setting topic.metadata.refresh.sparse to the string "true" makes sure that librdkafka fetches only the topics he uses.

Setting topic.metadata.refresh.sparse to "true", and topic.metadata.refresh.interval.ms to 600 seconds (plus some jitter) can reduce the bandwidth a lot, depending on the number of consumers and topics.

internal.termination.signal

This setting allows librdkafka threads to terminate as soon as librdkafka is done with them. This effectively allows your PHP processes / requests to terminate quickly.

When enabling this, you have to mask the signal like this:

<?php
// once
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
// any time
$conf->set('internal.termination.signal', SIGIO);

socket.blocking.max.ms

Maximum time a broker socket operation may block. A lower value improves responsiveness at the expense of slightly higher CPU usage.

Reducing the value of this setting improves shutdown speed. The value defines the maximum time librdkafka will block in one iteration of a read loop. This also defines how often the main librdkafka thread will check for termination.

queue.buffering.max.ms

This defines the maximum and default time librdkafka will wait before sending a batch of messages. Reducing this setting to e.g. 1ms ensures that messages are sent ASAP, instead of being batched.

This has been seen to reduce the shutdown time of the rdkafka instance, and of the PHP process / request.

Performance / Low-latency settings

Here is a configuration optimized for low latency. This allows a PHP process / request to send messages ASAP and to terminate quickly.

<?php

$conf = new \RdKafka\Conf();
$conf->set('socket.timeout.ms', 50); // or socket.blocking.max.ms, depending on librdkafka version
if (function_exists('pcntl_sigprocmask')) {
    pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
    $conf->set('internal.termination.signal', SIGIO);
} else {
    $conf->set('queue.buffering.max.ms', 1);
}

$producer = new \RdKafka\Producer($conf);
$consumer = new \RdKafka\Consumer($conf);

Polling after producing can also be important to reduce termination times:

$producer->produce(...);
while ($producer->getOutQLen() > 0) {
    $producer->poll(1);
}

Documentation

https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html

Asking for Help

If the documentation is not enough, feel free to ask a questions on the php-rdkafka channels on Gitter or Google Groups.

Stubs

Because your IDE is not able to auto discover php-rdkadka api you can consider usage of external package providing a set of stubs for php-rdkafka classes, functions and constants: kwn/php-rdkafka-stubs

Contributing

If you would like to contribute, thank you :)

Before you start, please take a look at the CONTRIBUTING document to see how to get your changes merged in.

Credits

Documentation copied from librdkafka.

Authors: see contributors.

License

php-rdkafka is released under the MIT license.