-
Notifications
You must be signed in to change notification settings - Fork 4
/
create-topic.php
64 lines (57 loc) · 1.58 KB
/
create-topic.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
<?php
declare(strict_types=1);
use RdKafka\Admin\Client;
use RdKafka\Admin\NewTopic;
use RdKafka\Conf;
require_once dirname(__DIR__) . '/vendor/autoload.php';
$options = array_merge(
[
'p' => 1,
'r' => 1,
'b' => getenv('KAFKA_BROKERS') ?: 'kafka:9092',
'w' => 10000,
],
getopt('t:p::r::b::w::')
);
if (empty($options['t'])) {
echo sprintf(
'Usage: %s -t{topicname} [-p{numberOfPartitions:1}] [-r{replicationFactor:1} [-b{brokerList:kafka:9092} [-w{waitForResultMs:10000}]' . PHP_EOL,
basename(__FILE__)
);
exit();
}
$conf = new Conf();
$conf->set('bootstrap.servers', $options['b']);
$client = Client::fromConf($conf);
$client->setWaitForResultEventTimeout((int) $options['w']);
$partitions = $options['p'];
$replicationFactor = $options['r'];
$results = $client->createTopics(
[
new NewTopic(
(string) $options['t'],
(int) $partitions,
(int) $replicationFactor
),
]
);
foreach ($results as $result) {
if ($result->error === RD_KAFKA_RESP_ERR_NO_ERROR) {
echo sprintf(
'Created topic %s with %s partitions and replication factor %s.',
$result->name,
$partitions,
$replicationFactor
);
} else {
echo sprintf(
'Failed to created topic %s with %s partitions and replication factor %s. Reason: %s (%s)',
$result->name,
$partitions,
$replicationFactor,
$result->errorString,
$result->error
);
}
echo PHP_EOL;
}