Skip to content

Commit

Permalink
Message Headers support
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Jan 2, 2018
1 parent b118271 commit 4a86230
Show file tree
Hide file tree
Showing 21 changed files with 1,818 additions and 40 deletions.
107 changes: 93 additions & 14 deletions examples/rdkafka_example.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ static void msg_consume (rd_kafka_message_t *rkmessage,
if (!quiet) {
rd_kafka_timestamp_type_t tstype;
int64_t timestamp;
rd_kafka_headers_t *hdrs;

fprintf(stdout, "%% Message (offset %"PRId64", %zd bytes):\n",
rkmessage->offset, rkmessage->len);

Expand All @@ -187,6 +189,27 @@ static void msg_consume (rd_kafka_message_t *rkmessage,
!timestamp ? 0 :
(int)time(NULL) - (int)(timestamp/1000));
}

if (!rd_kafka_message_headers(rkmessage, &hdrs)) {
size_t idx = 0;
const char *name;
const void *val;
size_t size;

fprintf(stdout, "%% Headers:");

while (!rd_kafka_header_get_all(hdrs, idx++,
&name, &val, &size)) {
fprintf(stdout, "%s%s=",
idx == 1 ? " " : ", ", name);
if (val)
fprintf(stdout, "\"%.*s\"",
(int)size, (const char *)val);
else
fprintf(stdout, "NULL");
}
fprintf(stdout, "\n");
}
}

if (rkmessage->key_len) {
Expand Down Expand Up @@ -287,6 +310,8 @@ int main (int argc, char **argv) {
int64_t seek_offset = 0;
int64_t tmp_offset = 0;
int get_wmarks = 0;
rd_kafka_headers_t *hdrs = NULL;
rd_kafka_resp_err_t err;

/* Kafka configuration */
conf = rd_kafka_conf_new();
Expand All @@ -301,7 +326,7 @@ int main (int argc, char **argv) {
/* Topic configuration */
topic_conf = rd_kafka_topic_conf_new();

while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:As:")) != -1) {
while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:As:H:")) != -1) {
switch (opt) {
case 'P':
case 'C':
Expand Down Expand Up @@ -370,6 +395,31 @@ int main (int argc, char **argv) {
case 'A':
output = OUTPUT_RAW;
break;
case 'H':
{
char *name, *val;
size_t name_sz = -1;

name = optarg;
val = strchr(name, '=');
if (val) {
name_sz = (size_t)(val-name);
val++; /* past the '=' */
}

if (!hdrs)
hdrs = rd_kafka_headers_new(8);

err = rd_kafka_header_add(hdrs, name, name_sz, val, -1);
if (err) {
fprintf(stderr,
"%% Failed to add header %s: %s\n",
name, rd_kafka_err2str(err));
exit(1);
}
}
break;

case 'X':
{
char *name, *val;
Expand Down Expand Up @@ -503,6 +553,7 @@ int main (int argc, char **argv) {
" %s\n"
" -q Be quiet\n"
" -A Raw payload output (consumer)\n"
" -H <name[=value]> Add header to message (producer)\n"
" -X <prop=name> Set arbitrary librdkafka "
"configuration property\n"
" Properties prefixed with \"topic.\" "
Expand Down Expand Up @@ -585,21 +636,46 @@ int main (int argc, char **argv) {
buf[--len] = '\0';

/* Send/Produce message. */
if (rd_kafka_produce(rkt, partition,
RD_KAFKA_MSG_F_COPY,
/* Payload and length */
buf, len,
/* Optional key and its length */
NULL, 0,
/* Message opaque, provided in
* delivery report callback as
* msg_opaque. */
NULL) == -1) {
fprintf(stderr,
"%% Failed to produce to topic %s "
if (hdrs) {
rd_kafka_headers_t *hdrs_copy;

hdrs_copy = rd_kafka_headers_copy(hdrs);

err = rd_kafka_producev(
rk,
RD_KAFKA_V_RKT(rkt),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(buf, len),
RD_KAFKA_V_HEADERS(hdrs_copy),
RD_KAFKA_V_END);

if (err)
rd_kafka_headers_destroy(hdrs_copy);

} else {
if (rd_kafka_produce(
rkt, partition,
RD_KAFKA_MSG_F_COPY,
/* Payload and length */
buf, len,
/* Optional key and its length */
NULL, 0,
/* Message opaque, provided in
* delivery report callback as
* msg_opaque. */
NULL) == -1) {
err = rd_kafka_last_error();
}
}

if (err) {
fprintf(stderr,
"%% Failed to produce to topic %s "
"partition %i: %s\n",
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_err2str(err));

/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
continue;
Expand Down Expand Up @@ -792,6 +868,9 @@ int main (int argc, char **argv) {
exit(err ? 2 : 0);
}

if (hdrs)
rd_kafka_headers_destroy(hdrs);

if (topic_conf)
rd_kafka_topic_conf_destroy(topic_conf);

Expand Down
93 changes: 88 additions & 5 deletions examples/rdkafka_performance.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ static int incremental_mode = 0;
static int partition_cnt = 0;
static int eof_cnt = 0;
static int with_dr = 1;
static int read_hdrs = 0;


static void stop (int sig) {
if (!run)
Expand Down Expand Up @@ -315,6 +317,12 @@ static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) {

}

if (read_hdrs) {
rd_kafka_headers_t *hdrs;
/* Force parsing of headers but don't do anything with them. */
rd_kafka_message_headers(rkmessage, &hdrs);
}

if (msgcnt != -1 && (int)cnt.msgs >= msgcnt)
run = 0;
}
Expand Down Expand Up @@ -715,6 +723,44 @@ static int read_conf_file (rd_kafka_conf_t *conf,
}


static rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
rd_kafka_topic_t *rkt, int32_t partition,
int msgflags,
void *payload, size_t size,
const void *key, size_t key_size,
const rd_kafka_headers_t *hdrs) {

/* Send/Produce message. */
if (hdrs) {
rd_kafka_headers_t *hdrs_copy;
rd_kafka_resp_err_t err;

hdrs_copy = rd_kafka_headers_copy(hdrs);

err = rd_kafka_producev(
rk,
RD_KAFKA_V_RKT(rkt),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_MSGFLAGS(msgflags),
RD_KAFKA_V_VALUE(payload, size),
RD_KAFKA_V_KEY(key, key_size),
RD_KAFKA_V_HEADERS(hdrs_copy),
RD_KAFKA_V_END);

if (err)
rd_kafka_headers_destroy(hdrs_copy);

return err;

} else {
if (rd_kafka_produce(rkt, partition, msgflags, payload, size,
key, key_size, NULL) == 1)
return rd_kafka_last_error();
}

return RD_KAFKA_RESP_ERR_NO_ERROR;
}


int main (int argc, char **argv) {
char *brokers = NULL;
Expand Down Expand Up @@ -749,6 +795,8 @@ int main (int argc, char **argv) {
int rate_sleep = 0;
rd_kafka_topic_partition_list_t *topics;
int exitcode = 0;
rd_kafka_headers_t *hdrs = NULL;
rd_kafka_resp_err_t err;

/* Kafka configuration */
conf = rd_kafka_conf_new();
Expand Down Expand Up @@ -789,7 +837,7 @@ int main (int argc, char **argv) {
while ((opt =
getopt(argc, argv,
"PCG:t:p:b:s:k:c:fi:MDd:m:S:x:"
"R:a:z:o:X:B:eT:Y:qvIur:lA:OwN")) != -1) {
"R:a:z:o:X:B:eT:Y:qvIur:lA:OwNHH:")) != -1) {
switch (opt) {
case 'G':
if (rd_kafka_conf_set(conf, "group.id", optarg,
Expand Down Expand Up @@ -888,6 +936,37 @@ int main (int argc, char **argv) {
case 'd':
debug = optarg;
break;
case 'H':
{
char *name, *val;
size_t name_sz = -1;

if (!optarg) {
read_hdrs = 1;
break;
}

name = optarg;
val = strchr(name, '=');
if (val) {
name_sz = (size_t)(val-name);
val++; /* past the '=' */
}

if (!hdrs)
hdrs = rd_kafka_headers_new(8);

err = rd_kafka_header_add(hdrs, name, name_sz, val, -1);
if (err) {
fprintf(stderr,
"%% Failed to add header %s: %s\n",
name, rd_kafka_err2str(err));
exit(1);
}

read_hdrs = 1;
}
break;
case 'X':
{
char *name, *val;
Expand Down Expand Up @@ -1033,6 +1112,8 @@ int main (int argc, char **argv) {
" -b <brokers> Broker address list (host[:port],..)\n"
" -s <size> Message size (producer)\n"
" -k <key> Message key (producer)\n"
" -H <name[=value]> Add header to message (producer)\n"
" -H Read message headers (consumer)\n"
" -c <cnt> Messages to transmit/receive\n"
" -x <cnt> Hard exit after transmitting <cnt> messages (producer)\n"
" -D Copy/Duplicate data buffer (producer)\n"
Expand Down Expand Up @@ -1261,10 +1342,9 @@ int main (int argc, char **argv) {

cnt.tx++;
while (run &&
rd_kafka_produce(rkt, partition,
sendflags, pbuf, msgsize,
key, keylen, NULL) == -1) {
rd_kafka_resp_err_t err = rd_kafka_last_error();
(err = do_produce(rk, rkt, partition, sendflags,
pbuf, msgsize,
key, keylen, hdrs))) {
if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
printf("%% No such partition: "
"%"PRId32"\n", partition);
Expand Down Expand Up @@ -1540,6 +1620,9 @@ int main (int argc, char **argv) {
rd_kafka_destroy(rk);
}

if (hdrs)
rd_kafka_headers_destroy(hdrs);

print_stats(NULL, mode, otype|_OTYPE_FORCE, compression);

if (cnt.t_fetch_latency && cnt.msgs)
Expand Down
5 changes: 5 additions & 0 deletions src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ enum ErrorCode {
ERR__VALUE_DESERIALIZATION = -159,
/** Partial response */
ERR__PARTIAL = -158,
/** Modification attempted on read-only object */
ERR__READ_ONLY = -157,
/** No such entry / item not found */
ERR__NOENT = -156,

/** End internal error codes */
ERR__END = -100,

Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ set(
rdkafka_topic.c
rdkafka_transport.c
rdkafka_interceptor.c
rdkafka_header.c
rdlist.c
rdlog.c
rdmurmur2.c
Expand Down
1 change: 1 addition & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdregex.c rdports.c rdkafka_metadata_cache.c rdavl.c \
rdkafka_sasl.c rdkafka_sasl_plain.c rdkafka_interceptor.c \
rdkafka_msgset_writer.c rdkafka_msgset_reader.c \
rdkafka_header.c \
rdvarint.c rdbuf.c rdunittest.c \
$(SRCS_y)

Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
"Local: Value deserialization error"),
_ERR_DESC(RD_KAFKA_RESP_ERR__PARTIAL,
"Local: Partial response"),
_ERR_DESC(RD_KAFKA_RESP_ERR__READ_ONLY,
"Local: Read-only object"),
_ERR_DESC(RD_KAFKA_RESP_ERR__NOENT,
"Local: No such entry"),

_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN,
"Unknown broker error"),
Expand Down
Loading

0 comments on commit 4a86230

Please sign in to comment.