diff --git a/.gitignore b/.gitignore index 8e01f2e..5199d2b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # directories vendor/ build/ +dev/ # files composer.phar diff --git a/src/ClusterMetadata/Zookeeper.php b/src/ClusterMetadata/Zookeeper.php index f094ae7..9c50503 100644 --- a/src/ClusterMetadata/Zookeeper.php +++ b/src/ClusterMetadata/Zookeeper.php @@ -166,7 +166,7 @@ public function getZookeeper() public function getBrokers() { $result = []; - $lists = $this->zookeeper->getChildren(self::BROKER_PATH); + $lists = $this->getZookeeper()->getChildren(self::BROKER_PATH); if (!empty($lists)) { foreach ($lists as $brokerId) { @@ -194,8 +194,8 @@ public function getBrokerDetail($brokerId) $result = []; $path = sprintf(self::BROKER_DETAIL_PATH, (int) $brokerId); - if ($this->zookeeper->exists($path)) { - $result = $this->zookeeper->get($path); + if ($this->getZookeeper()->exists($path)) { + $result = $this->getZookeeper()->get($path); if (empty($result)) { return []; diff --git a/src/Consumer.php b/src/Consumer.php index ecba3d8..86f7f55 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -31,7 +31,12 @@ public function __construct(ClusterMetadataInterface $clusterMetadata = null, \R } if (!empty($clusterMetadata)) { - $config->set('metadata.broker.list', $clusterMetadata->getBrokers()); + $brokers = $clusterMetadata->getBrokers(); + $brokers = array_map(function($broker) { + return sprintf('%s:%d', $broker['host'], $broker['port']); + }, $brokers); + + $config->set('metadata.broker.list', implode(',', $brokers)); } parent::__construct($config); diff --git a/src/Producer.php b/src/Producer.php index 842917d..11b93df 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -31,7 +31,12 @@ public function __construct(ClusterMetadataInterface $clusterMetadata = null, \R } if (!empty($clusterMetadata)) { - $config->set('metadata.broker.list', $clusterMetadata->getBrokers()); + $brokers = $clusterMetadata->getBrokers(); + $brokers = array_map(function($broker) { + return sprintf('%s:%d', $broker['host'], $broker['port']); + }, $brokers); + + $config->set('metadata.broker.list', implode(',', $brokers)); } parent::__construct($config); diff --git a/tests/ConsumerTest.php b/tests/ConsumerTest.php index 65cad9a..d769210 100644 --- a/tests/ConsumerTest.php +++ b/tests/ConsumerTest.php @@ -25,7 +25,12 @@ public function testConsumerWithClusterMetadata() $zookeeperMock->expects($this->once()) ->method('getBrokers') - ->will($this->returnValue('127.0.0.1:9092')); + ->will($this->returnValue([ + [ + 'host' => '127.0.0.1', + 'port' => 9092 + ] + ])); (new Consumer($zookeeperMock)); } @@ -36,7 +41,12 @@ public function testConsumerWithClusterMetadataAndConf() $zookeeperMock->expects($this->once()) ->method('getBrokers') - ->will($this->returnValue('127.0.0.1:9092')); + ->will($this->returnValue([ + [ + 'host' => '127.0.0.1', + 'port' => 9092 + ] + ])); $confMock = $this->getMock('\RdKafka\Conf'); diff --git a/tests/ProducerTest.php b/tests/ProducerTest.php index 60050c5..5ffb206 100644 --- a/tests/ProducerTest.php +++ b/tests/ProducerTest.php @@ -25,7 +25,12 @@ public function testProducerWithClusterMetadata() $zookeeperMock->expects($this->once()) ->method('getBrokers') - ->will($this->returnValue('127.0.0.1:9092')); + ->will($this->returnValue([ + [ + 'host' => '127.0.0.1', + 'port' => 9092 + ] + ])); (new Producer($zookeeperMock)); } @@ -36,7 +41,12 @@ public function testProducerWithClusterMetadataAndConf() $zookeeperMock->expects($this->once()) ->method('getBrokers') - ->will($this->returnValue('127.0.0.1:9092')); + ->will($this->returnValue([ + [ + 'host' => '127.0.0.1', + 'port' => 9092 + ] + ])); $confMock = $this->getMock('\RdKafka\Conf');