Skip to content
This repository was archived by the owner on Jul 17, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
language: php

sudo: required

php:
- 5.6
- 7.0
- 7.1
- hhvm
- nightly

before_script:
- composer install
install:
- ./travis-install.sh
- travis_retry composer install --no-interaction

matrix:
fast_finish: true
allow_failures:
- php: 7.1
- php: hhvm
- php: nightly

notifications:
slack:
secure: LCF/2QlcsU0V5HmR5gJx1/SAmoZQ39zxG3jrRVOFE6itPk4au8Aal6b1l6HSlhLYhzyv84pmobMhy/Cjm6lePZyE9QalMcsRzLBQ1oZzF7fVB+ypO3W7dG6V0CnGGtkGO4MsSTwMnQ9X/GEWxNBehcmo0kRnvQPGWtEFSHbAqV+86yq/lpfBW3sXuv3TV6mtCwzfaTkermlMC63i6p3rXwwLgf19kAJ//Gp7d8/eVnrq+CbyGOD6+pAHbCFWxEHr2o6P1SMS8mnPRsgBQ+qCNICWRrmb+8gOUUS5JgnPJSAWLI/n0Q+n8CkJTjIfRK+N352n+CitCRcq+76alT05ogW8CY4mJDp2Qn0nEs5h+6NGGIcwveF3kcXmBpVDz2N2J5zpjzk4mybXX8gilxJ1WnPVGRD0cpJSazmpaNV6Y7lRMM1LqvIiY1LNzFQdp1CJjb0n6MQPaSnUF1w/e2k/UfLh1ZGTDy1US0p7RtY1RibbEFVUbtneEENRTYWoVQt+coBF+DFdsDYX/HoEuTkWQeKGmHBrtSX3UEz7v50QkQkr4jvk151JFi7fqJZLazOTbP59g0WSEggTXfauVw/S14S1Ir2+MA/glAYZXzFneKyM9fKiVbxMFrpJNwyurM5ODE72iDKJ5ejkYZLe9EpTRYbW/1p5/1yOK+s49cqGP2w=
on_success: change
on_failure: always
11 changes: 10 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,17 @@ RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y \
git \
zlib1g-dev \
librdkafka-dev \
unzip \
python \
&& ( \
cd /tmp \
&& mkdir librdkafka \
&& cd librdkafka \
&& git clone https://github.com/edenhill/librdkafka.git . \
&& ./configure \
&& make \
&& make install \
) \
&& rm -r /var/lib/apt/lists/*

# PHP Extensions
Expand Down
26 changes: 13 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,24 @@ A Kafka adapter for the [php-pubsub](https://github.com/Superbalist/php-pubsub)
## Usage

```php
// use this topic config for both the producer and consumer
$topicConfig = new \RdKafka\TopicConf();
$topicConfig->set('auto.offset.reset', 'smallest');
$topicConfig->set('auto.commit.interval.ms', 300);
// create consumer
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$conf = new \RdKafka\Conf();
$conf->set('group.id', 'php-pubsub');
$conf->set('metadata.broker.list', '127.0.0.1');
$conf->set('enable.auto.commit', 'false');
$conf->set('offset.store.method', 'broker');
$conf->setDefaultTopicConf($topicConf);

$consumer = new \RdKafka\KafkaConsumer($conf);

// create producer
$producer = new \RdKafka\Producer();
$producer->addBrokers('127.0.0.1');

// create consumer
// see https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html
$config = new \RdKafka\Conf();
$config->set('group.id', 'php-pubsub');

$consumer = new \RdKafka\Consumer($config);
$consumer->addBrokers('127.0.0.1');

$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer, $topicConfig);
$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer);

// consume messages
// note: this is a blocking call
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
- ./src:/opt/php-pubsub/src
- ./examples:/opt/php-pubsub/examples
kafka:
image: spotify/kafka
image: flozano/kafka
environment:
- ADVERTISED_HOST=HOSTIP
- ADVERTISED_PORT=9092
Expand Down
26 changes: 13 additions & 13 deletions examples/KafkaConsumerExample.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@

include __DIR__ . '/../vendor/autoload.php';

// use this topic config for both the producer and consumer
$topicConfig = new \RdKafka\TopicConf();
$topicConfig->set('auto.offset.reset', 'smallest');
$topicConfig->set('auto.commit.interval.ms', 300);
// create consumer
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$conf = new \RdKafka\Conf();
$conf->set('group.id', 'php-pubsub');
$conf->set('metadata.broker.list', 'kafka');
$conf->set('enable.auto.commit', 'false');
$conf->set('offset.store.method', 'broker');
$conf->setDefaultTopicConf($topicConf);

$consumer = new \RdKafka\KafkaConsumer($conf);

// create producer
$producer = new \RdKafka\Producer();
$producer->addBrokers('kafka');

// create consumer
// see https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html
$config = new \RdKafka\Conf();
$config->set('group.id', 'php-pubsub');

$consumer = new \RdKafka\Consumer($config);
$consumer->addBrokers('kafka');

$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer, $topicConfig);
$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer);

$adapter->subscribe('my_channel', function ($message) {
var_dump($message);
Expand Down
26 changes: 13 additions & 13 deletions examples/KafkaPublishExample.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@

include __DIR__ . '/../vendor/autoload.php';

// use this topic config for both the producer and consumer
$topicConfig = new \RdKafka\TopicConf();
$topicConfig->set('auto.offset.reset', 'smallest');
$topicConfig->set('auto.commit.interval.ms', 300);
// create consumer
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$conf = new \RdKafka\Conf();
$conf->set('group.id', 'php-pubsub');
$conf->set('metadata.broker.list', 'kafka');
$conf->set('enable.auto.commit', 'false');
$conf->set('offset.store.method', 'broker');
$conf->setDefaultTopicConf($topicConf);

$consumer = new \RdKafka\KafkaConsumer($conf);

// create producer
$producer = new \RdKafka\Producer();
$producer->addBrokers('kafka');

// create consumer
// see https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html
$config = new \RdKafka\Conf();
$config->set('group.id', 'php-pubsub');

$consumer = new \RdKafka\Consumer($config);
$consumer->addBrokers('kafka');

$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer, $topicConfig);
$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer);

$adapter->publish('my_channel', 'HELLO WORLD');
$adapter->publish('my_channel', json_encode(['hello' => 'world']));
Expand Down
75 changes: 12 additions & 63 deletions src/KafkaPubSubAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,18 @@ class KafkaPubSubAdapter implements PubSubAdapterInterface
protected $producer;

/**
* @var \RdKafka\Consumer
* @var \RdKafka\KafkaConsumer
*/
protected $consumer;

/**
* @var \RdKafka\TopicConf
*/
protected $topicConfig;

/**
* @var mixed
*/
protected $consumerOffset;

/**
* @param \RdKafka\Producer $producer
* @param \RdKafka\Consumer $consumer
* @param \RdKafka\TopicConf $topicConfig
* @param mixed $consumerOffset The offset at which to start consumption
* (RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED)
* @param \RdKafka\KafkaConsumer $consumer
*/
public function __construct(
\RdKafka\Producer $producer,
\RdKafka\Consumer $consumer,
\RdKafka\TopicConf $topicConfig,
$consumerOffset = RD_KAFKA_OFFSET_END
) {
public function __construct(\RdKafka\Producer $producer, \RdKafka\KafkaConsumer $consumer)
{
$this->producer = $producer;
$this->consumer = $consumer;
$this->topicConfig = $topicConfig;
$this->consumerOffset = $consumerOffset;
}

/**
Expand All @@ -59,45 +40,13 @@ public function getProducer()
/**
* Return the Kafka consumer.
*
* @return \RdKafka\Consumer
* @return \RdKafka\KafkaConsumer
*/
public function getConsumer()
{
return $this->consumer;
}

/**
* Return the Kafka TopicConfig.
*
* @return \RdKafka\TopicConf
*/
public function getTopicConfig()
{
return $this->topicConfig;
}

/**
* Return the Kafka consumer offset at which `subscribe()` calls begin consumption.
*
* @return mixed
*/
public function getConsumerOffset()
{
return $this->consumerOffset;
}

/**
* Set the Kafka consumer offset at which `subscribe()` calls begin consumption.
*
* This can be one of `RD_KAFKA_OFFSET_BEGINNING`, `RD_KAFKA_OFFSET_END` or `RD_KAFKA_OFFSET_STORED`
*
* @param mixed $consumerOffset
*/
public function setConsumerOffset($consumerOffset)
{
$this->consumerOffset = $consumerOffset;
}

/**
* Subscribe a handler to a channel.
*
Expand All @@ -107,14 +56,12 @@ public function setConsumerOffset($consumerOffset)
*/
public function subscribe($channel, callable $handler)
{
$topic = $this->consumer->newTopic($channel, $this->topicConfig);

$topic->consumeStart(0, $this->consumerOffset);
$this->consumer->subscribe([$channel]);

$isSubscriptionLoopActive = true;

while ($isSubscriptionLoopActive) {
$message = $topic->consume(0, 1000);
$message = $this->consumer->consume(300);

if ($message === null) {
continue;
Expand All @@ -126,10 +73,12 @@ public function subscribe($channel, callable $handler)

if ($payload === 'unsubscribe') {
$isSubscriptionLoopActive = false;
break;
} else {
call_user_func($handler, $payload);
}

call_user_func($handler, $payload);
$this->consumer->commitAsync($message);

break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
case RD_KAFKA_RESP_ERR__TIMED_OUT:
Expand All @@ -148,7 +97,7 @@ public function subscribe($channel, callable $handler)
*/
public function publish($channel, $message)
{
$topic = $this->producer->newTopic($channel, $this->topicConfig);
$topic = $this->producer->newTopic($channel);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, Utils::serializeMessage($message));
}
}
Loading