Skip to content

Commit

Permalink
Prioritize all relevant user-facing ops (callbacks) (#1088)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Mar 10, 2017
1 parent 6ed3997 commit a4d1af0
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 14 deletions.
2 changes: 2 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ void rd_kafka_log_buf (const rd_kafka_t *rk, int level, const char *fac,
return; /* Terminating */

rko = rd_kafka_op_new(RD_KAFKA_OP_LOG);
rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM);
rko->rko_u.log.level = level;
strncpy(rko->rko_u.log.fac, fac,
sizeof(rko->rko_u.log.fac) - 1);
Expand Down Expand Up @@ -1080,6 +1081,7 @@ static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {

/* Enqueue op for application */
rko = rd_kafka_op_new(RD_KAFKA_OP_STATS);
rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
rko->rko_u.stats.json = buf;
rko->rko_u.stats.json_len = of;
rd_kafka_q_enq(rk->rk_rep, rko);
Expand Down
4 changes: 3 additions & 1 deletion src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -5292,7 +5292,9 @@ const char *rd_kafka_broker_name (rd_kafka_broker_t *rkb) {
* @locks none
*/
void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb) {
rd_kafka_q_enq(rkb->rkb_ops, rd_kafka_op_new(RD_KAFKA_OP_WAKEUP));
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_WAKEUP);
rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_FLASH);
rd_kafka_q_enq(rkb->rkb_ops, rko);
rd_rkb_dbg(rkb, QUEUE, "WAKEUP", "Wake-up");
}

Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1815,7 +1815,7 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit (rd_kafka_t *rk,
if (!rko_orig->rko_u.offset_commit.cb && rk->rk_conf.offset_commit_cb) {
rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);

rd_kafka_op_set_prio(rko_reply, 1);
rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);

if (offsets)
rko_reply->rko_u.offset_commit.partitions =
Expand All @@ -1834,7 +1834,7 @@ static void rd_kafka_cgrp_op_handle_OffsetCommit (rd_kafka_t *rk,
if (rko_orig->rko_replyq.q) {
rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);

rd_kafka_op_set_prio(rko_reply, 1);
rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);

/* Copy offset & partitions & callbacks to reply op */
rko_reply->rko_u.offset_commit = rko_orig->rko_u.offset_commit;
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ void rd_kafka_offset_commit_cb_op (rd_kafka_t *rk,
return;

rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT|RD_KAFKA_OP_REPLY);
rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
rko->rko_err = err;
rko->rko_u.offset_commit.cb = rk->rk_conf.offset_commit_cb;/*maybe NULL*/
rko->rko_u.offset_commit.opaque = rk->rk_conf.opaque;
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ void rd_kafka_op_throttle_time (rd_kafka_broker_t *rkb,
rd_atomic32_set(&rkb->rkb_rk->rk_last_throttle, throttle_time);

rko = rd_kafka_op_new(RD_KAFKA_OP_THROTTLE);
rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
rko->rko_u.throttle.nodename = rd_strdup(rkb->rkb_nodename);
rko->rko_u.throttle.nodeid = rkb->rkb_nodeid;
rko->rko_u.throttle.throttle_time = throttle_time;
Expand Down
19 changes: 17 additions & 2 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,22 @@ typedef enum {
#define RD_KAFKA_OP_FLAGMASK (RD_KAFKA_OP_CB | RD_KAFKA_OP_REPLY)


/**
* @brief Op/queue priority levels.
* @remark Since priority levels alter the FIFO order, pay extra attention
* to preserve ordering as deemed necessary.
* @remark Priority should only be set on ops destined for application
* facing queues (rk_rep, rkcg_q, etc).
*/
typedef enum {
RD_KAFKA_PRIO_NORMAL = 0, /* Normal bulk, messages, DRs, etc. */
RD_KAFKA_PRIO_MEDIUM, /* Prioritize in front of bulk,
* still at some scale. e.g. logs, .. */
RD_KAFKA_PRIO_HIGH, /* Small scale high priority */
RD_KAFKA_PRIO_FLASH /* Micro scale, immediate delivery. */
} rd_kafka_op_prio_t;


#define RD_KAFKA_OP_TYPE_ASSERT(rko,type) \
rd_kafka_assert(NULL, (rko)->rko_type == (type) && # type)

Expand All @@ -129,7 +145,7 @@ struct rd_kafka_op_s {
rd_kafka_resp_err_t rko_err;
int32_t rko_len; /* Depends on type, typically the
* message length. */
int rko_prio; /* In-queue priority.
rd_kafka_op_prio_t rko_prio; /* In-queue priority.
* Higher value means higher prio. */

shptr_rd_kafka_toppar_t *rko_rktp;
Expand Down Expand Up @@ -283,7 +299,6 @@ rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk,
rd_kafka_op_t *rko));
int rd_kafka_op_reply (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);


#define rd_kafka_op_set_prio(rko,prio) ((rko)->rko_prio = prio)


Expand Down
42 changes: 33 additions & 9 deletions tests/0060-op_prio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,47 @@
* MO:
*
* - Seed topic with 1000 messages
* - Start consumer with auto offset commit disabled, but a commit callback
* - Start consumer with auto offset commit disabled,
* but with commit and stats callbacks registered,
* - Consume one message
* - Commit that message manually
* - Consume one message per second
* - The commit callback should be fired within reasonable time, long before
* - The stats callback should behave the same.
* all messages are consumed.
*/



class MyCommitCb : public RdKafka::OffsetCommitCb {
class MyCbs : public RdKafka::OffsetCommitCb, public RdKafka::EventCb {
public:
int seen;
int seen_commit;
int seen_stats;

void offset_commit_cb (RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*>&offsets) {
if (err)
Test::Fail("Offset commit failed: " + RdKafka::err2str(err));

seen++;
seen_commit++;
Test::Say("Got commit callback!\n");
}

void event_cb (RdKafka::Event &event) {
switch (event.type())
{
case RdKafka::Event::EVENT_STATS:
Test::Say("Got stats callback!\n");
seen_stats++;
break;
default:
break;
}
}
};



static void do_test_commit_cb (void) {
const int msgcnt = 1000;
std::string errstr;
Expand All @@ -78,12 +95,17 @@ static void do_test_commit_cb (void) {
Test::conf_set(conf, "enable.auto.commit", "false");
Test::conf_set(conf, "enable.partition.eof", "false");
Test::conf_set(conf, "auto.offset.reset", "earliest");
Test::conf_set(conf, "statistics.interval.ms", "1000");

MyCommitCb commit_cb;
commit_cb.seen = 0;
if (conf->set("offset_commit_cb", &commit_cb, errstr) !=
MyCbs cbs;
cbs.seen_commit = 0;
cbs.seen_stats = 0;
if (conf->set("offset_commit_cb", (RdKafka::OffsetCommitCb *)&cbs, errstr) !=
RdKafka::Conf::CONF_OK)
Test::Fail("Failed to set commit callback: " + errstr);
if (conf->set("event_cb", (RdKafka::EventCb *)&cbs, errstr) !=
RdKafka::Conf::CONF_OK)
Test::Fail("Failed to set event callback: " + errstr);

RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
if (!c)
Expand All @@ -99,13 +121,15 @@ static void do_test_commit_cb (void) {
/* Wait for messages and commit callback. */
Test::Say("Consuming topic " + topic + "\n");
int cnt = 0;
while (!commit_cb.seen) {
while (!cbs.seen_commit || !cbs.seen_stats) {
RdKafka::Message *msg = c->consume(1000);
if (!msg->err()) {
cnt++;
Test::Say(tostr() << "Received message #" << cnt << "\n");
if (cnt > 10)
Test::Fail("Should've seen the offset commit callback by now");
Test::Fail(tostr() << "Should've seen the "
"offset commit (" << cbs.seen_commit << ") and "
"stats callbacks (" << cbs.seen_stats << ") by now");

/* Commit the first message to trigger the offset commit_cb */
if (cnt == 1) {
Expand Down

0 comments on commit a4d1af0

Please sign in to comment.