diff --git a/.travis.yml b/.travis.yml index 3936662..8dfdb56 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,7 @@ language: php +sudo: required + php: - 5.6 - 7.0 @@ -7,5 +9,19 @@ php: - hhvm - nightly -before_script: - - composer install \ No newline at end of file +install: + - ./travis-install.sh + - travis_retry composer install --no-interaction + +matrix: + fast_finish: true + allow_failures: + - php: 7.1 + - php: hhvm + - php: nightly + +notifications: + slack: + secure: LCF/2QlcsU0V5HmR5gJx1/SAmoZQ39zxG3jrRVOFE6itPk4au8Aal6b1l6HSlhLYhzyv84pmobMhy/Cjm6lePZyE9QalMcsRzLBQ1oZzF7fVB+ypO3W7dG6V0CnGGtkGO4MsSTwMnQ9X/GEWxNBehcmo0kRnvQPGWtEFSHbAqV+86yq/lpfBW3sXuv3TV6mtCwzfaTkermlMC63i6p3rXwwLgf19kAJ//Gp7d8/eVnrq+CbyGOD6+pAHbCFWxEHr2o6P1SMS8mnPRsgBQ+qCNICWRrmb+8gOUUS5JgnPJSAWLI/n0Q+n8CkJTjIfRK+N352n+CitCRcq+76alT05ogW8CY4mJDp2Qn0nEs5h+6NGGIcwveF3kcXmBpVDz2N2J5zpjzk4mybXX8gilxJ1WnPVGRD0cpJSazmpaNV6Y7lRMM1LqvIiY1LNzFQdp1CJjb0n6MQPaSnUF1w/e2k/UfLh1ZGTDy1US0p7RtY1RibbEFVUbtneEENRTYWoVQt+coBF+DFdsDYX/HoEuTkWQeKGmHBrtSX3UEz7v50QkQkr4jvk151JFi7fqJZLazOTbP59g0WSEggTXfauVw/S14S1Ir2+MA/glAYZXzFneKyM9fKiVbxMFrpJNwyurM5ODE72iDKJ5ejkYZLe9EpTRYbW/1p5/1yOK+s49cqGP2w= + on_success: change + on_failure: always \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 2eab506..54f1a6b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,8 +9,17 @@ RUN apt-get update \ && DEBIAN_FRONTEND=noninteractive apt-get install -y \ git \ zlib1g-dev \ - librdkafka-dev \ unzip \ + python \ + && ( \ + cd /tmp \ + && mkdir librdkafka \ + && cd librdkafka \ + && git clone https://github.com/edenhill/librdkafka.git . \ + && ./configure \ + && make \ + && make install \ + ) \ && rm -r /var/lib/apt/lists/* # PHP Extensions diff --git a/README.md b/README.md index 35acfdc..f0f4459 100644 --- a/README.md +++ b/README.md @@ -44,24 +44,24 @@ A Kafka adapter for the [php-pubsub](https://github.com/Superbalist/php-pubsub) ## Usage ```php -// use this topic config for both the producer and consumer -$topicConfig = new \RdKafka\TopicConf(); -$topicConfig->set('auto.offset.reset', 'smallest'); -$topicConfig->set('auto.commit.interval.ms', 300); +// create consumer +$topicConf = new \RdKafka\TopicConf(); +$topicConf->set('auto.offset.reset', 'smallest'); + +$conf = new \RdKafka\Conf(); +$conf->set('group.id', 'php-pubsub'); +$conf->set('metadata.broker.list', '127.0.0.1'); +$conf->set('enable.auto.commit', 'false'); +$conf->set('offset.store.method', 'broker'); +$conf->setDefaultTopicConf($topicConf); + +$consumer = new \RdKafka\KafkaConsumer($conf); // create producer $producer = new \RdKafka\Producer(); $producer->addBrokers('127.0.0.1'); -// create consumer -// see https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html -$config = new \RdKafka\Conf(); -$config->set('group.id', 'php-pubsub'); - -$consumer = new \RdKafka\Consumer($config); -$consumer->addBrokers('127.0.0.1'); - -$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer, $topicConfig); +$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer); // consume messages // note: this is a blocking call diff --git a/docker-compose.yml b/docker-compose.yml index 1859116..0e7b90c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,7 +8,7 @@ services: - ./src:/opt/php-pubsub/src - ./examples:/opt/php-pubsub/examples kafka: - image: spotify/kafka + image: flozano/kafka environment: - ADVERTISED_HOST=HOSTIP - ADVERTISED_PORT=9092 diff --git a/examples/KafkaConsumerExample.php b/examples/KafkaConsumerExample.php index ab0ba9a..57027c8 100644 --- a/examples/KafkaConsumerExample.php +++ b/examples/KafkaConsumerExample.php @@ -2,24 +2,24 @@ include __DIR__ . '/../vendor/autoload.php'; -// use this topic config for both the producer and consumer -$topicConfig = new \RdKafka\TopicConf(); -$topicConfig->set('auto.offset.reset', 'smallest'); -$topicConfig->set('auto.commit.interval.ms', 300); +// create consumer +$topicConf = new \RdKafka\TopicConf(); +$topicConf->set('auto.offset.reset', 'smallest'); + +$conf = new \RdKafka\Conf(); +$conf->set('group.id', 'php-pubsub'); +$conf->set('metadata.broker.list', 'kafka'); +$conf->set('enable.auto.commit', 'false'); +$conf->set('offset.store.method', 'broker'); +$conf->setDefaultTopicConf($topicConf); + +$consumer = new \RdKafka\KafkaConsumer($conf); // create producer $producer = new \RdKafka\Producer(); $producer->addBrokers('kafka'); -// create consumer -// see https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html -$config = new \RdKafka\Conf(); -$config->set('group.id', 'php-pubsub'); - -$consumer = new \RdKafka\Consumer($config); -$consumer->addBrokers('kafka'); - -$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer, $topicConfig); +$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer); $adapter->subscribe('my_channel', function ($message) { var_dump($message); diff --git a/examples/KafkaPublishExample.php b/examples/KafkaPublishExample.php index 105bc12..82c48cc 100644 --- a/examples/KafkaPublishExample.php +++ b/examples/KafkaPublishExample.php @@ -2,24 +2,24 @@ include __DIR__ . '/../vendor/autoload.php'; -// use this topic config for both the producer and consumer -$topicConfig = new \RdKafka\TopicConf(); -$topicConfig->set('auto.offset.reset', 'smallest'); -$topicConfig->set('auto.commit.interval.ms', 300); +// create consumer +$topicConf = new \RdKafka\TopicConf(); +$topicConf->set('auto.offset.reset', 'smallest'); + +$conf = new \RdKafka\Conf(); +$conf->set('group.id', 'php-pubsub'); +$conf->set('metadata.broker.list', 'kafka'); +$conf->set('enable.auto.commit', 'false'); +$conf->set('offset.store.method', 'broker'); +$conf->setDefaultTopicConf($topicConf); + +$consumer = new \RdKafka\KafkaConsumer($conf); // create producer $producer = new \RdKafka\Producer(); $producer->addBrokers('kafka'); -// create consumer -// see https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html -$config = new \RdKafka\Conf(); -$config->set('group.id', 'php-pubsub'); - -$consumer = new \RdKafka\Consumer($config); -$consumer->addBrokers('kafka'); - -$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer, $topicConfig); +$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer); $adapter->publish('my_channel', 'HELLO WORLD'); $adapter->publish('my_channel', json_encode(['hello' => 'world'])); diff --git a/src/KafkaPubSubAdapter.php b/src/KafkaPubSubAdapter.php index a7f98ef..b69041f 100644 --- a/src/KafkaPubSubAdapter.php +++ b/src/KafkaPubSubAdapter.php @@ -13,37 +13,18 @@ class KafkaPubSubAdapter implements PubSubAdapterInterface protected $producer; /** - * @var \RdKafka\Consumer + * @var \RdKafka\KafkaConsumer */ protected $consumer; - /** - * @var \RdKafka\TopicConf - */ - protected $topicConfig; - - /** - * @var mixed - */ - protected $consumerOffset; - /** * @param \RdKafka\Producer $producer - * @param \RdKafka\Consumer $consumer - * @param \RdKafka\TopicConf $topicConfig - * @param mixed $consumerOffset The offset at which to start consumption - * (RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED) + * @param \RdKafka\KafkaConsumer $consumer */ - public function __construct( - \RdKafka\Producer $producer, - \RdKafka\Consumer $consumer, - \RdKafka\TopicConf $topicConfig, - $consumerOffset = RD_KAFKA_OFFSET_END - ) { + public function __construct(\RdKafka\Producer $producer, \RdKafka\KafkaConsumer $consumer) + { $this->producer = $producer; $this->consumer = $consumer; - $this->topicConfig = $topicConfig; - $this->consumerOffset = $consumerOffset; } /** @@ -59,45 +40,13 @@ public function getProducer() /** * Return the Kafka consumer. * - * @return \RdKafka\Consumer + * @return \RdKafka\KafkaConsumer */ public function getConsumer() { return $this->consumer; } - /** - * Return the Kafka TopicConfig. - * - * @return \RdKafka\TopicConf - */ - public function getTopicConfig() - { - return $this->topicConfig; - } - - /** - * Return the Kafka consumer offset at which `subscribe()` calls begin consumption. - * - * @return mixed - */ - public function getConsumerOffset() - { - return $this->consumerOffset; - } - - /** - * Set the Kafka consumer offset at which `subscribe()` calls begin consumption. - * - * This can be one of `RD_KAFKA_OFFSET_BEGINNING`, `RD_KAFKA_OFFSET_END` or `RD_KAFKA_OFFSET_STORED` - * - * @param mixed $consumerOffset - */ - public function setConsumerOffset($consumerOffset) - { - $this->consumerOffset = $consumerOffset; - } - /** * Subscribe a handler to a channel. * @@ -107,14 +56,12 @@ public function setConsumerOffset($consumerOffset) */ public function subscribe($channel, callable $handler) { - $topic = $this->consumer->newTopic($channel, $this->topicConfig); - - $topic->consumeStart(0, $this->consumerOffset); + $this->consumer->subscribe([$channel]); $isSubscriptionLoopActive = true; while ($isSubscriptionLoopActive) { - $message = $topic->consume(0, 1000); + $message = $this->consumer->consume(300); if ($message === null) { continue; @@ -126,10 +73,12 @@ public function subscribe($channel, callable $handler) if ($payload === 'unsubscribe') { $isSubscriptionLoopActive = false; - break; + } else { + call_user_func($handler, $payload); } - call_user_func($handler, $payload); + $this->consumer->commitAsync($message); + break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: case RD_KAFKA_RESP_ERR__TIMED_OUT: @@ -148,7 +97,7 @@ public function subscribe($channel, callable $handler) */ public function publish($channel, $message) { - $topic = $this->producer->newTopic($channel, $this->topicConfig); + $topic = $this->producer->newTopic($channel); $topic->produce(RD_KAFKA_PARTITION_UA, 0, Utils::serializeMessage($message)); } } diff --git a/tests/KafkaPubSubAdapterTest.php b/tests/KafkaPubSubAdapterTest.php index 11dcc43..0672f80 100644 --- a/tests/KafkaPubSubAdapterTest.php +++ b/tests/KafkaPubSubAdapterTest.php @@ -8,10 +8,6 @@ use Tests\Mocks\MockKafkaErrorMessage; if (!extension_loaded('rdkafka')) { - define('RD_KAFKA_OFFSET_BEGINNING', 0); - define('RD_KAFKA_OFFSET_END', 1); - define('RD_KAFKA_OFFSET_STORED', 2); - define('RD_KAFKA_PARTITION_UA', 0); define('RD_KAFKA_RESP_ERR_NO_ERROR', 0); @@ -24,95 +20,44 @@ class KafkaPubSubAdapterTest extends TestCase public function testGetProducer() { $producer = Mockery::mock(\RdKafka\Producer::class); - $consumer = Mockery::mock(\RdKafka\Consumer::class); - $topicConfig = Mockery::mock(\RdKafka\TopicConf::class); - $adapter = new KafkaPubSubAdapter($producer, $consumer, $topicConfig); + $consumer = Mockery::mock(\RdKafka\KafkaConsumer::class); + $adapter = new KafkaPubSubAdapter($producer, $consumer); $this->assertSame($producer, $adapter->getProducer()); } public function testGetConsumer() { $producer = Mockery::mock(\RdKafka\Producer::class); - $consumer = Mockery::mock(\RdKafka\Consumer::class); - $topicConfig = Mockery::mock(\RdKafka\TopicConf::class); - $adapter = new KafkaPubSubAdapter($producer, $consumer, $topicConfig); + $consumer = Mockery::mock(\RdKafka\KafkaConsumer::class); + $adapter = new KafkaPubSubAdapter($producer, $consumer); $this->assertSame($consumer, $adapter->getConsumer()); } - public function testGetTopicConfig() - { - $producer = Mockery::mock(\RdKafka\Producer::class); - $consumer = Mockery::mock(\RdKafka\Consumer::class); - $topicConfig = Mockery::mock(\RdKafka\TopicConf::class); - $adapter = new KafkaPubSubAdapter($producer, $consumer, $topicConfig); - $this->assertSame($topicConfig, $adapter->getTopicConfig()); - } - - public function testGetConsumerOffsetDefaultIsEnd() - { - $producer = Mockery::mock(\RdKafka\Producer::class); - $consumer = Mockery::mock(\RdKafka\Consumer::class); - $topicConfig = Mockery::mock(\RdKafka\TopicConf::class); - $adapter = new KafkaPubSubAdapter($producer, $consumer, $topicConfig); - $this->assertEquals(RD_KAFKA_OFFSET_END, $adapter->getConsumerOffset()); - } - - public function testGetSetConsumerOffset() - { - $producer = Mockery::mock(\RdKafka\Producer::class); - $consumer = Mockery::mock(\RdKafka\Consumer::class); - $topicConfig = Mockery::mock(\RdKafka\TopicConf::class); - $adapter = new KafkaPubSubAdapter($producer, $consumer, $topicConfig, RD_KAFKA_OFFSET_STORED); - $this->assertSame(RD_KAFKA_OFFSET_STORED, $adapter->getConsumerOffset()); - - $adapter->setConsumerOffset(RD_KAFKA_OFFSET_BEGINNING); - $this->assertEquals(RD_KAFKA_OFFSET_BEGINNING, $adapter->getConsumerOffset()); - } - public function testSubscribeWithNullMessage() { $producer = Mockery::mock(\RdKafka\Producer::class); - $topic = Mockery::mock(\RdKafka\Topic::class); - $topic->shouldReceive('consumeStart') - ->withArgs([ - 0, - RD_KAFKA_OFFSET_END - ]) - ->once(); + $consumer = Mockery::mock(\RdKafka\KafkaConsumer::class); - $topic->shouldReceive('consume') - ->withArgs([ - 0, - 1000 - ]) - ->andReturnNull() + $consumer->shouldReceive('subscribe') + ->with(['channel_name']) ->once(); // we need this to kill the infinite loop so the test can finish $unsubscribeMessage = new \stdClass(); $unsubscribeMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR; $unsubscribeMessage->payload = 'unsubscribe'; - $topic->shouldReceive('consume') - ->withArgs([ - 0, - 1000 - ]) - ->andReturn($unsubscribeMessage) - ->once(); - - $topicConfig = Mockery::mock(\RdKafka\TopicConf::class); - $consumer = Mockery::mock(\RdKafka\Consumer::class); - $consumer->shouldReceive('newTopic') - ->withArgs([ - 'channel_name', - $topicConfig - ]) + $consumer->shouldReceive('consume') + ->with(300) ->once() - ->andReturn($topic); + ->andReturn($unsubscribeMessage); + + $consumer->shouldReceive('commitAsync') + ->with($unsubscribeMessage) + ->once(); - $adapter = new KafkaPubSubAdapter($producer, $consumer, $topicConfig); + $adapter = new KafkaPubSubAdapter($producer, $consumer); $handler1 = Mockery::mock(\stdClass::class); $handler1->shouldNotReceive('handle'); @@ -124,49 +69,39 @@ public function testSubscribeWithPartitionEofErrorCode() { $producer = Mockery::mock(\RdKafka\Producer::class); - $topic = Mockery::mock(\RdKafka\Topic::class); - $topic->shouldReceive('consumeStart') - ->withArgs([ - 0, - RD_KAFKA_OFFSET_END - ]) + $consumer = Mockery::mock(\RdKafka\KafkaConsumer::class); + + $consumer->shouldReceive('subscribe') + ->with(['channel_name']) ->once(); $message = new \stdClass(); $message->err = RD_KAFKA_RESP_ERR__PARTITION_EOF; $message->payload = null; - $topic->shouldReceive('consume') - ->withArgs([ - 0, - 1000 - ]) - ->andReturn($message) - ->once(); + + $consumer->shouldReceive('consume') + ->with(300) + ->once() + ->andReturn($message); + + $consumer->shouldNotReceive('commitAsnyc') + ->with($message); // we need this to kill the infinite loop so the test can finish $unsubscribeMessage = new \stdClass(); $unsubscribeMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR; $unsubscribeMessage->payload = 'unsubscribe'; - $topic->shouldReceive('consume') - ->withArgs([ - 0, - 1000 - ]) - ->andReturn($unsubscribeMessage) - ->once(); - - $topicConfig = Mockery::mock(\RdKafka\TopicConf::class); - $consumer = Mockery::mock(\RdKafka\Consumer::class); - $consumer->shouldReceive('newTopic') - ->withArgs([ - 'channel_name', - $topicConfig - ]) + $consumer->shouldReceive('consume') + ->with(300) ->once() - ->andReturn($topic); + ->andReturn($unsubscribeMessage); - $adapter = new KafkaPubSubAdapter($producer, $consumer, $topicConfig); + $consumer->shouldReceive('commitAsync') + ->with($unsubscribeMessage) + ->once(); + + $adapter = new KafkaPubSubAdapter($producer, $consumer); $handler1 = Mockery::mock(\stdClass::class); $handler1->shouldNotReceive('handle'); @@ -178,49 +113,39 @@ public function testSubscribeWithTimedOutErrorCode() { $producer = Mockery::mock(\RdKafka\Producer::class); - $topic = Mockery::mock(\RdKafka\Topic::class); - $topic->shouldReceive('consumeStart') - ->withArgs([ - 0, - RD_KAFKA_OFFSET_END - ]) + $consumer = Mockery::mock(\RdKafka\KafkaConsumer::class); + + $consumer->shouldReceive('subscribe') + ->with(['channel_name']) ->once(); $message = new \stdClass(); $message->err = RD_KAFKA_RESP_ERR__TIMED_OUT; $message->payload = null; - $topic->shouldReceive('consume') - ->withArgs([ - 0, - 1000 - ]) - ->andReturn($message) - ->once(); + + $consumer->shouldReceive('consume') + ->with(300) + ->once() + ->andReturn($message); + + $consumer->shouldNotReceive('commitAsnyc') + ->with($message); // we need this to kill the infinite loop so the test can finish $unsubscribeMessage = new \stdClass(); $unsubscribeMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR; $unsubscribeMessage->payload = 'unsubscribe'; - $topic->shouldReceive('consume') - ->withArgs([ - 0, - 1000 - ]) - ->andReturn($unsubscribeMessage) - ->once(); - $topicConfig = Mockery::mock(\RdKafka\TopicConf::class); - - $consumer = Mockery::mock(\RdKafka\Consumer::class); - $consumer->shouldReceive('newTopic') - ->withArgs([ - 'channel_name', - $topicConfig - ]) + $consumer->shouldReceive('consume') + ->with(300) ->once() - ->andReturn($topic); + ->andReturn($unsubscribeMessage); - $adapter = new KafkaPubSubAdapter($producer, $consumer, $topicConfig); + $consumer->shouldReceive('commitAsync') + ->with($unsubscribeMessage) + ->once(); + + $adapter = new KafkaPubSubAdapter($producer, $consumer); $handler1 = Mockery::mock(\stdClass::class); $handler1->shouldNotReceive('handle'); @@ -232,49 +157,40 @@ public function testSubscribeWithMessagePayload() { $producer = Mockery::mock(\RdKafka\Producer::class); - $topic = Mockery::mock(\RdKafka\Topic::class); - $topic->shouldReceive('consumeStart') - ->withArgs([ - 0, - RD_KAFKA_OFFSET_END - ]) + $consumer = Mockery::mock(\RdKafka\KafkaConsumer::class); + + $consumer->shouldReceive('subscribe') + ->with(['channel_name']) ->once(); $message = new \stdClass(); $message->err = RD_KAFKA_RESP_ERR_NO_ERROR; $message->payload = 'a:1:{s:5:"hello";s:5:"world";}'; - $topic->shouldReceive('consume') - ->withArgs([ - 0, - 1000 - ]) - ->andReturn($message) + + $consumer->shouldReceive('consume') + ->with(300) + ->once() + ->andReturn($message); + + $consumer->shouldReceive('commitAsync') + ->with($message) ->once(); // we need this to kill the infinite loop so the test can finish $unsubscribeMessage = new \stdClass(); $unsubscribeMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR; $unsubscribeMessage->payload = 'unsubscribe'; - $topic->shouldReceive('consume') - ->withArgs([ - 0, - 1000 - ]) - ->andReturn($unsubscribeMessage) - ->once(); - $topicConfig = Mockery::mock(\RdKafka\TopicConf::class); - - $consumer = Mockery::mock(\RdKafka\Consumer::class); - $consumer->shouldReceive('newTopic') - ->withArgs([ - 'channel_name', - $topicConfig - ]) + $consumer->shouldReceive('consume') + ->with(300) ->once() - ->andReturn($topic); + ->andReturn($unsubscribeMessage); - $adapter = new KafkaPubSubAdapter($producer, $consumer, $topicConfig); + $consumer->shouldReceive('commitAsync') + ->with($unsubscribeMessage) + ->once(); + + $adapter = new KafkaPubSubAdapter($producer, $consumer); $handler1 = Mockery::mock(\stdClass::class); $handler1->shouldReceive('handle') @@ -288,34 +204,23 @@ public function testSubscribeWithErrorThrowsException() { $producer = Mockery::mock(\RdKafka\Producer::class); - $topic = Mockery::mock(\RdKafka\Topic::class); - $topic->shouldReceive('consumeStart') - ->withArgs([ - 0, - RD_KAFKA_OFFSET_END - ]) - ->once(); + $consumer = Mockery::mock(\RdKafka\KafkaConsumer::class); - $topic->shouldReceive('consume') - ->withArgs([ - 0, - 1000 - ]) - ->andReturn(new MockKafkaErrorMessage()) + $consumer->shouldReceive('subscribe') + ->with(['channel_name']) ->once(); - $topicConfig = Mockery::mock(\RdKafka\TopicConf::class); + $message = new MockKafkaErrorMessage(); - $consumer = Mockery::mock(\RdKafka\Consumer::class); - $consumer->shouldReceive('newTopic') - ->withArgs([ - 'channel_name', - $topicConfig - ]) + $consumer->shouldReceive('consume') + ->with(300) ->once() - ->andReturn($topic); + ->andReturn($message); + + $consumer->shouldNotReceive('commitAsnyc') + ->with($message); - $adapter = new KafkaPubSubAdapter($producer, $consumer, $topicConfig); + $adapter = new KafkaPubSubAdapter($producer, $consumer); $handler1 = Mockery::mock(\stdClass::class); $handler1->shouldNotReceive('handle'); @@ -338,20 +243,15 @@ public function testPublish() ]) ->once(); - $topicConfig = Mockery::mock(\RdKafka\TopicConf::class); - $producer = Mockery::mock(\RdKafka\Producer::class); $producer->shouldReceive('newTopic') - ->withArgs([ - 'channel_name', - $topicConfig - ]) + ->with('channel_name') ->once() ->andReturn($topic); - $consumer = Mockery::mock(\RdKafka\Consumer::class); + $consumer = Mockery::mock(\RdKafka\KafkaConsumer::class); - $adapter = new KafkaPubSubAdapter($producer, $consumer, $topicConfig); + $adapter = new KafkaPubSubAdapter($producer, $consumer); $adapter->publish('channel_name', ['hello' => 'world']); } diff --git a/travis-install.sh b/travis-install.sh new file mode 100755 index 0000000..a2e028b --- /dev/null +++ b/travis-install.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +set -e + +cd /tmp \ + && mkdir librdkafka \ + && cd librdkafka \ + && git clone https://github.com/edenhill/librdkafka.git . \ + && ./configure \ + && make \ + && sudo make install + +if [[ "$TRAVIS_PHP_VERSION" =~ ^7.* ]] +then + cd /tmp \ + && mkdir php-rdkafka \ + && cd php-rdkafka \ + && git clone https://github.com/arnaud-lb/php-rdkafka.git . \ + && git checkout php7 \ + && phpize \ + && ./configure \ + && make \ + && sudo make install +else + pecl install channel://pecl.php.net/rdkafka-alpha +fi + +echo "extension = rdkafka.so" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini \ No newline at end of file