Skip to content

Commit

Permalink
Added offsets_for_times() support (KIP-79)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Jan 7, 2017
1 parent dbc96ea commit d94be91
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -2,7 +2,7 @@ include Makefile.config

BIN= kafkacat

SRCS_y= kafkacat.c format.c
SRCS_y= kafkacat.c format.c tools.c
SRCS_$(ENABLE_JSON) += json.c
OBJS= $(SRCS_y:.c=.o)

Expand Down
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -140,3 +140,8 @@ JSON metadata listing
Pretty-printed JSON metadata listing

$ kafkacat -b mybroker -L -J | jq .


Query offset(s) by timestamp(s)

$ kafkacat -b mybroker -Q -t mytopic:3:2389238523 mytopic2:0:18921841
68 changes: 68 additions & 0 deletions json.c
Expand Up @@ -205,6 +205,74 @@ void metadata_print_json (const struct rd_kafka_metadata *metadata) {
}


/**
* @brief Generate (if json_gen is a valid yajl_gen), or print (if json_gen is NULL)
* a map of topic+partitions+offsets[+errors]
*
* { "<topic>": { "topic": "<topic>",
* "<partition>": { "partition": <partition>, "offset": <o>,
* ["error": "..."]},
* .. },
* .. }
*/
void partition_list_print_json (const rd_kafka_topic_partition_list_t *parts,
void *json_gen) {
yajl_gen g = (yajl_gen)json_gen;
int i;
const char *last_topic = "";

if (!g)
g = yajl_gen_alloc(NULL);

yajl_gen_map_open(g);
for (i = 0 ; i < parts->cnt ; i++) {
const rd_kafka_topic_partition_t *p = &parts->elems[i];
char partstr[16];

if (strcmp(last_topic, p->topic)) {
if (*last_topic)
yajl_gen_map_close(g); /* topic */

JS_STR(g, p->topic);
yajl_gen_map_open(g); /* topic */
JS_STR(g, "topic");
JS_STR(g, p->topic);
last_topic = p->topic;
}

snprintf(partstr, sizeof(partstr), "%"PRId32, p->partition);

JS_STR(g, partstr);
yajl_gen_map_open(g);
JS_STR(g, "partition");
yajl_gen_integer(g, p->partition);
JS_STR(g, "offset");
yajl_gen_integer(g, p->offset);
if (p->err) {
JS_STR(g, "error");
JS_STR(g, rd_kafka_err2str(p->err));
}
yajl_gen_map_close(g);

}

if (*last_topic)
yajl_gen_map_close(g); /* topic */

yajl_gen_map_close(g);


if (!json_gen) {
const unsigned char *buf;
size_t len;

yajl_gen_get_buf(g, &buf, &len);
(void)fwrite(buf, len, 1, stdout);
yajl_gen_free(g);
}
}



void fmt_init_json (void) {
}
Expand Down
83 changes: 76 additions & 7 deletions kafkacat.c
Expand Up @@ -863,7 +863,7 @@ static void RD_NORETURN usage (const char *argv0, int exitcode,

fprintf(out, "\n"
"General options:\n"
" -C | -P | -L Mode: Consume, Produce or metadata List\n"
" -C | -P | -L | -Q Mode: Consume, Produce, Metadata List, Query mode\n"
#if ENABLE_KAFKACONSUMER
" -G <group-id> Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)\n"
" Expects a list of topics to subscribe to\n"
Expand Down Expand Up @@ -929,9 +929,17 @@ static void RD_NORETURN usage (const char *argv0, int exitcode,
"(instead of empty)\n"
" -u Unbuffered output\n"
"\n"
"Metadata options:\n"
"Metadata options (-L):\n"
" -t <topic> Topic to query (optional)\n"
"\n"
"Query options (-Q):\n"
" -t <t>:<p>:<ts> Get offset for topic <t>,\n"
" partition <p>, timestamp <ts>.\n"
" Timestamp is the number of milliseconds\n"
" since epoch UTC.\n"
" Requires broker >= 0.10.0.0 and librdkafka >= 0.9.3.\n"
" Multiple -t .. are allowed but a partition\n"
" must only occur once.\n"
"\n"
"Format string tokens:\n"
" %%s Message payload\n"
Expand Down Expand Up @@ -969,6 +977,9 @@ static void RD_NORETURN usage (const char *argv0, int exitcode,
"\n"
"Metadata listing:\n"
" kafkacat -L -b <broker> [-t <topic>]\n"
"\n"
"Query offset by timestamp:\n"
" kafkacat -Q -b broker -t <topic>:<partition>:<timestamp>\n"
"\n",
conf.null_str
);
Expand Down Expand Up @@ -1015,19 +1026,55 @@ static int parse_delim (const char *str) {
return delim;
}

/**
* @brief Add topic+partition+offset to list, from :-separated string.
*
* "<t>:<p>:<o>"
*
* @remark Will modify \p str
*/
static void add_topparoff (const char *what,
rd_kafka_topic_partition_list_t *rktparlist,
char *str) {
char *s, *t, *e;
char *topic;
int partition;
int64_t offset;

if (!(s = strchr(str, ':')) ||
!(t = strchr(s+1, ':')))
FATAL("%s: expected \"topic:partition:offset_or_timestamp\"", what);

topic = str;
*s = '\0';

partition = strtoul(s+1, &e, 0);
if (e == s+1)
FATAL("%s: expected \"topic:partition:offset_or_timestamp\"", what);

offset = strtoll(t+1, &e, 0);
if (e == t+1)
FATAL("%s: expected \"topic:partition:offset_or_timestamp\"", what);

rd_kafka_topic_partition_list_add(rktparlist, topic, partition)->offset = offset;
}


/**
* Parse command line arguments
*/
static void argparse (int argc, char **argv) {
static void argparse (int argc, char **argv,
rd_kafka_topic_partition_list_t **rktparlistp) {
char errstr[512];
int opt;
const char *fmt = NULL;
const char *delim = "\n";
const char *key_delim = NULL;
char tmp_fmt[64];


while ((opt = getopt(argc, argv,
"PCG:Lt:p:b:z:o:eD:K:Od:qvX:c:Tuf:ZlVh"
"PCG:LQt:p:b:z:o:eD:K:Od:qvX:c:Tuf:ZlVh"
#if ENABLE_JSON
"J"
#endif
Expand All @@ -1036,6 +1083,7 @@ static void argparse (int argc, char **argv) {
case 'P':
case 'C':
case 'L':
case 'Q':
conf.mode = opt;
break;
#if ENABLE_KAFKACONSUMER
Expand All @@ -1049,7 +1097,14 @@ static void argparse (int argc, char **argv) {
break;
#endif
case 't':
conf.topic = optarg;
if (conf.mode == 'Q') {
if (!*rktparlistp)
*rktparlistp = rd_kafka_topic_partition_list_new(1);
add_topparoff("-t", *rktparlistp, optarg);

} else
conf.topic = optarg;

break;
case 'p':
conf.partition = atoi(optarg);
Expand Down Expand Up @@ -1219,8 +1274,10 @@ static void argparse (int argc, char **argv) {
}


if (!strchr("GL", conf.mode) && !conf.topic)
if (!strchr("GLQ", conf.mode) && !conf.topic)
usage(argv[0], 1, "-t <topic> missing", 0);
else if (conf.mode == 'Q' && !*rktparlistp)
usage(argv[0], 1, "-t <topic>:<partition>:<offset_or_timestamp> missing", 0);

if (rd_kafka_conf_set(conf.rk_conf, "metadata.broker.list",
conf.brokers, errstr, sizeof(errstr)) !=
Expand Down Expand Up @@ -1305,6 +1362,7 @@ int main (int argc, char **argv) {
#endif
FILE *in = stdin;
struct timeval tv;
rd_kafka_topic_partition_list_t *rktparlist = NULL;

signal(SIGINT, term);
signal(SIGTERM, term);
Expand Down Expand Up @@ -1334,7 +1392,7 @@ int main (int argc, char **argv) {
rd_kafka_conf_set_log_cb(conf.rk_conf, rd_kafka_log_print);

/* Parse command line arguments */
argparse(argc, argv);
argparse(argc, argv, &rktparlist);

/* Dump configuration and exit, if so desired. */
if (conf.conf_dump) {
Expand Down Expand Up @@ -1378,6 +1436,17 @@ int main (int argc, char **argv) {
metadata_list();
break;

case 'Q':
if (!rktparlist)
usage(argv[0], 1,
"-Q requires one or more "
"-t <topic>:<partition>:<timestamp>", 0);

query_offsets_by_time(rktparlist);

rd_kafka_topic_partition_list_destroy(rktparlist);
break;

default:
usage(argv[0], 0, NULL, 0);
break;
Expand Down
9 changes: 8 additions & 1 deletion kafkacat.h
Expand Up @@ -137,8 +137,15 @@ void fmt_term (void);
*/
void fmt_msg_output_json (FILE *fp, const rd_kafka_message_t *rkmessage);
void metadata_print_json (const struct rd_kafka_metadata *metadata);

void partition_list_print_json (const rd_kafka_topic_partition_list_t *parts,
void *json_gen);
void fmt_init_json (void);
void fmt_term_json (void);

#endif


/*
* tools.c
*/
int query_offsets_by_time (rd_kafka_topic_partition_list_t *offsets);
80 changes: 80 additions & 0 deletions tools.c
@@ -0,0 +1,80 @@
/*
* kafkacat - Apache Kafka consumer and producer
*
* Copyright (c) 2016, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "kafkacat.h"

void partition_list_print (rd_kafka_topic_partition_list_t *parts,
void *json_gen) {
int i;

/* Sort by topic+partition */
rd_kafka_topic_partition_list_sort(parts, NULL, NULL);

#if ENABLE_JSON
if (conf.flags & CONF_F_FMT_JSON) {
partition_list_print_json(parts, json_gen);
return;
}
#endif

for (i = 0 ; i < parts->cnt ; i++) {
const rd_kafka_topic_partition_t *p = &parts->elems[i];
printf("%s [%"PRId32"] offset %"PRId64"%s",
p->topic, p->partition, p->offset,
!p->err ? "\n": "");
if (p->err)
printf(": %s\n", rd_kafka_err2str(p->err));
}
}

int query_offsets_by_time (rd_kafka_topic_partition_list_t *offsets) {
rd_kafka_resp_err_t err;
#if RD_KAFKA_VERSION >= 0x00090300
char errstr[512];

if (rd_kafka_conf_set(conf.rk_conf, "api.version.request", "true",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
FATAL("Failed to enable api.version.request: %s", errstr);

if (!(conf.rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf.rk_conf,
errstr, sizeof(errstr))))
FATAL("Failed to create producer: %s", errstr);

err = rd_kafka_offsets_for_times(conf.rk, offsets, 10*1000);
#else
err = RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
#endif
if (err)
FATAL("offsets_for_times failed: %s", rd_kafka_err2str(err));

partition_list_print(offsets, NULL);

rd_kafka_destroy(conf.rk);

return 0;
}

0 comments on commit d94be91

Please sign in to comment.