diff --git a/kafkacat.c b/kafkacat.c index 2a0c20b..00e3edc 100644 --- a/kafkacat.c +++ b/kafkacat.c @@ -351,7 +351,8 @@ static void producer_run (FILE *fp, char **paths, int pathcnt) { static void handle_partition_eof (rd_kafka_message_t *rkmessage) { - if (conf.mode == 'C') { + if (conf.mode == 'C' && !conf.group) { + /* Store EOF offset. * If partition is empty and at offset 0, * store future first message (0). */ @@ -371,7 +372,7 @@ static void handle_partition_eof (rd_kafka_message_t *rkmessage) { } } - } else if (conf.mode == 'G') { + } else if (conf.mode == 'C' && conf.group) { /* FIXME: Not currently handled */ } @@ -736,8 +737,112 @@ static void metadata_print (const rd_kafka_metadata_t *metadata) { } } } +#if ENABLE_KAFKACONSUMER +static void metadata_print_consumergroup(const rd_kafka_metadata_t *metadata, rd_kafka_t *rk); +static void list_metadata_for_consumergroup() { + char errstr[512]; + rd_kafka_resp_err_t err; + + const rd_kafka_metadata_t *metadata; + /* Create consumer */ + if (!(conf.rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf.rk_conf, errstr, sizeof(errstr)))) + FATAL("Failed to create producer: %s", errstr); + conf.rk_conf = NULL; + + /* Create topic, if specified */ + if (conf.topic && !(conf.rkt = rd_kafka_topic_new(conf.rk, conf.topic, conf.rkt_conf))) + FATAL("Failed to create topic %s: %s", conf.topic, + rd_kafka_err2str(rd_kafka_errno2err(errno))); + + /* Fetch metadata */ + err = rd_kafka_metadata(conf.rk, conf.rkt ? 0 : 1, conf.rkt, &metadata, 5000); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) + FATAL("Failed to acquire metadata: %s", rd_kafka_err2str(err)); + + metadata_print_consumergroup(metadata, conf.rk); + + rd_kafka_metadata_destroy(metadata); + + exit(0); +// rd_kafka_destroy(conf.rk); +} + +static void metadata_print_consumergroup(const rd_kafka_metadata_t *metadata, rd_kafka_t *rk) { + int i, j; + printf("Metadata for %s (from broker %"PRId32": %s):\n", conf.topic ? : "all topics", + metadata->orig_broker_id, metadata->orig_broker_name); + /* Iterate brokers */ + printf(" %i brokers:\n", metadata->broker_cnt); + for (i = 0; i < metadata->broker_cnt; i++) + printf(" broker %"PRId32" at %s:%i\n", metadata->brokers[i].id, + metadata->brokers[i].host, metadata->brokers[i].port); + + int32_t topar_cnt = 0; + for (i = 0; i < metadata->topic_cnt; i++) { + topar_cnt += metadata->topics[i].partition_cnt; + } + int32_t* reordered = malloc(topar_cnt * sizeof(int32_t)); + memset(reordered, 0x0, topar_cnt * sizeof(int32_t)); + rd_kafka_topic_partition_list_t *topars = rd_kafka_topic_partition_list_new(topar_cnt); + if (topar_cnt) { + int32_t runner = 0; + for (i = 0; i < metadata->topic_cnt; i++) { + for (j = 0; j < metadata->topics[i].partition_cnt; j++) { + const rd_kafka_metadata_partition_t *p; + p = &metadata->topics[i].partitions[j]; + reordered[runner + p->id] = j; + rd_kafka_topic_partition_list_add(topars, metadata->topics[i].topic, + p->id); + } + runner += metadata->topics[i].partition_cnt; + } + rd_kafka_position(rk, topars, 5000); + } + + int32_t runner = 0; + + /* Iterate topics */ + printf(" %i topics:\n", metadata->topic_cnt); + printf(" topic partition leader #replicas #isrs lowwatermark storedoffset highwatermark\n"); + for (i = 0; i < metadata->topic_cnt; i++) { + const rd_kafka_metadata_topic_t *t = &metadata->topics[i]; + if (t->err) { + printf(" %s", rd_kafka_err2str(t->err)); + if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) + printf(" (try again)"); + } + /* Iterate topic's partitions */ + for (int jj = 0; jj < t->partition_cnt; jj++) { + j = reordered[runner + jj]; + const rd_kafka_metadata_partition_t *p; + p = &t->partitions[j]; + + int64_t low = -1, high = -1; + rd_kafka_resp_err_t err = rd_kafka_query_watermark_offsets(rk, t->topic, p->id, + &low, &high, 5000); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) + FATAL("Failed to acquire watermark: %s", rd_kafka_err2str(err)); + if (topars->elems[runner + j].err != RD_KAFKA_RESP_ERR_NO_ERROR) + FATAL("Failed to acquire stored offset: %s", + rd_kafka_err2str(topars->elems[runner + j].err)); + printf( + " %-25s %3"PRId32" %2"PRId32" %2i %2i %8i %8i %8i", // + t->topic, p->id, p->leader, p->replica_cnt, p->isr_cnt, (int) low, + (int) topars->elems[runner + j].offset, (int) high); + if (p->err) + printf(", %s\n", rd_kafka_err2str(p->err)); + else + printf("\n"); + + } + runner += t->partition_cnt; + } + rd_kafka_topic_partition_list_destroy(topars); + free((void*) reordered); +} +#endif /** * Lists metadata */ @@ -828,8 +933,9 @@ static void __attribute__((noreturn)) usage (const char *argv0, int exitcode, "General options:\n" " -C | -P | -L Mode: Consume, Produce or metadata List\n" #if ENABLE_KAFKACONSUMER - " -G Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)\n" - " Expects a list of topics to subscribe to\n" + " -g Use high-level KafkaConsumer (Kafka 0.9 balanced consumer groups)\n" + " When in consumermode: Expects a list of topics to subscribe to.\n" + " When in metadate listmode: Prints stored offsets for this group and watermarks.\n" #endif " -t Topic to consume from, produce to, " "or list\n" @@ -984,9 +1090,9 @@ static void argparse (int argc, char **argv) { const char *delim = "\n"; const char *key_delim = NULL; char tmp_fmt[64]; - + conf.group = 0; while ((opt = getopt(argc, argv, - "PCG:Lt:p:b:z:o:eD:K:Od:qvX:c:Tuf:ZlV" + "PCg:Lt:p:b:z:o:eD:K:Od:qvX:c:Tuf:ZlV" #if ENABLE_JSON "J" #endif @@ -998,8 +1104,8 @@ static void argparse (int argc, char **argv) { conf.mode = opt; break; #if ENABLE_KAFKACONSUMER - case 'G': - conf.mode = opt; + case 'g': + //conf.mode = opt; conf.group = optarg; if (rd_kafka_conf_set(conf.rk_conf, "group.id", optarg, errstr, sizeof(errstr)) != @@ -1291,20 +1397,25 @@ int main (int argc, char **argv) { switch (conf.mode) { case 'C': - consumer_run(stdout); - break; - #if ENABLE_KAFKACONSUMER - case 'G': - kafkaconsumer_run(stdout, &argv[optind], argc-optind); - break; + if (conf.group) { + kafkaconsumer_run(stdout, &argv[optind], argc - optind); + break; + } #endif - + consumer_run(stdout); + break; case 'P': producer_run(in, &argv[optind], argc-optind); break; case 'L': +#if ENABLE_KAFKACONSUMER + if (conf.group) { + list_metadata_for_consumergroup(); + break; + } +#endif metadata_list(); break;