Skip to content

Commit

Permalink
Honour per-message partition in produce_batch() if MSG_F_PARTITION set (
Browse files Browse the repository at this point in the history
  • Loading branch information
barrotsteindev authored and edenhill committed Jan 21, 2018
1 parent 9d738e4 commit b8d0b84
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 14 deletions.
8 changes: 6 additions & 2 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -2928,8 +2928,9 @@ rd_kafka_position (rd_kafka_t *rk,
* Failure to do so will result
* in indefinately blocking on
* the produce() call when the
* message queue is full.
*/
* message queue is full. */
#define RD_KAFKA_MSG_F_PARTITION 0x8 /**< produce_batch() will honor
* per-message partition. */



Expand Down Expand Up @@ -2963,6 +2964,9 @@ rd_kafka_position (rd_kafka_t *rk,
* RD_KAFKA_MSG_F_COPY - the \p payload data will be copied and the
* \p payload pointer will not be used by rdkafka
* after the call returns.
* RD_KAFKA_MSG_F_PARTITION - produce_batch() will honour per-message
* partition, either set manually or by the
* configured partitioner.
*
* .._F_FREE and .._F_COPY are mutually exclusive.
*
Expand Down
42 changes: 31 additions & 11 deletions src/rdkafka_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -448,15 +448,17 @@ int rd_kafka_produce_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
int64_t utc_now = rd_uclock() / 1000;
rd_ts_t now = rd_clock();
int good = 0;
int multiple_partitions = (partition == RD_KAFKA_PARTITION_UA ||
(msgflags & RD_KAFKA_MSG_F_PARTITION));
rd_kafka_resp_err_t all_err = 0;
rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
rd_kafka_toppar_t *rktp = NULL;
shptr_rd_kafka_toppar_t *s_rktp = NULL;

/* For partitioner; hold lock for entire run,
/* For multiple partitions; hold lock for entire run,
* for one partition: only acquire for now. */
rd_kafka_topic_rdlock(rkt);
if (partition != RD_KAFKA_PARTITION_UA) {
if (!multiple_partitions) {
s_rktp = rd_kafka_toppar_get_avail(rkt, partition,
1/*ua on miss*/, &all_err);
rktp = rd_kafka_toppar_s2i(s_rktp);
Expand All @@ -474,7 +476,9 @@ int rd_kafka_produce_batch (rd_kafka_topic_t *app_rkt, int32_t partition,

/* Create message */
rkm = rd_kafka_msg_new0(rkt,
partition , msgflags,
(msgflags & RD_KAFKA_MSG_F_PARTITION) ?
rkmessages[i].partition : partition,
msgflags,
rkmessages[i].payload,
rkmessages[i].len,
rkmessages[i].key,
Expand All @@ -488,14 +492,30 @@ int rd_kafka_produce_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
continue;
}

/* Two cases here:
* partition==UA: run the partitioner (slow)
* fixed partition: simply concatenate the queue to partit */
if (partition == RD_KAFKA_PARTITION_UA) {
/* Partition the message */
rkmessages[i].err =
/* Three cases here:
* partition==UA: run the partitioner (slow)
* RD_KAFKA_MSG_F_PARTITION: produce message to specified
* partition
* fixed partition: simply concatenate the queue
* to partit */
if (multiple_partitions) {
if (partition == RD_KAFKA_PARTITION_UA) {
/* Partition the message */
rkmessages[i].err =
rd_kafka_msg_partitioner(rkt, rkm,
0/*already locked*/);
} else {
if (s_rktp == NULL
|| rkmessages[i].partition
!= s_rktp->rktp_partition) {
if (s_rktp != NULL)
rd_kafka_toppar_destroy(s_rktp);
s_rktp = rd_kafka_toppar_get_avail(rkt, rkmessages[i].partition,
1/*ua on miss*/, &all_err);
}
rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_toppar_enq_msg(rktp, rkm);
}

if (unlikely(rkmessages[i].err)) {
/* Interceptors: Unroll on_send by on_ack.. */
Expand All @@ -516,9 +536,9 @@ int rd_kafka_produce_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
good++;
}

if (partition == RD_KAFKA_PARTITION_UA)
if (multiple_partitions)
rd_kafka_topic_rdunlock(rkt);
else
if (s_rktp != NULL)
rd_kafka_toppar_destroy(s_rktp);

return good;
Expand Down
250 changes: 249 additions & 1 deletion tests/0011-produce_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
static int msgid_next = 0;
static int fails = 0;
static int msgcounter = 0;
static int *dr_partition_count = NULL;
static const int topic_num_partitions = 4;
static int msg_partition_wo_flag = 2;
static int msg_partition_wo_flag_success = 0;

/**
* Delivery reported callback.
Expand Down Expand Up @@ -111,6 +115,7 @@ static void test_single_partition (void) {
rkmessages[i].payload = rd_strdup(msg);
rkmessages[i].len = strlen(msg);
rkmessages[i]._private = msgidp;
rkmessages[i].partition = 2;
}

r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE,
Expand Down Expand Up @@ -280,8 +285,251 @@ static void test_partitioner (void) {
return;
}

static void dr_per_message_partition_cb (rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR)
TEST_FAIL("Message delivery failed: %s\n",
rd_kafka_err2str(rkmessage->err));

if (msgcounter <= 0)
TEST_FAIL("Too many message dr_cb callback calls "
"(at msg offset #%lld)\n", rkmessage->offset);

TEST_ASSERT(rkmessage->partition < topic_num_partitions);
msgcounter--;

dr_partition_count[rkmessage->partition]++;
}

/* Produce a batch of messages using with per message partition flag */
static void test_per_message_partition_flag (void) {
int partition = 0;
int r;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char msg[128];
int msgcnt = 1000;
int failcnt = 0;
int i;
int *rkpartition_counts;
rd_kafka_message_t *rkmessages;
const char *topic_name;

test_conf_init(&conf, &topic_conf, 30);

/* Set delivery report callback */
rd_kafka_conf_set_dr_msg_cb(conf, dr_per_message_partition_cb);

/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

TEST_SAY("test_per_message_partition_flag: Created kafka instance %s\n",
rd_kafka_name(rk));
topic_name = test_mk_topic_name("0011_per_message_flag", 1);
test_create_topic(topic_name, topic_num_partitions, 1);

rkt = rd_kafka_topic_new(rk, topic_name,
topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
rd_strerror(errno));

/* Create messages */
rkpartition_counts = calloc(sizeof(int), topic_num_partitions);
dr_partition_count = calloc(sizeof(int), topic_num_partitions);
rkmessages = calloc(sizeof(*rkmessages), msgcnt);
for (i = 0 ; i < msgcnt ; i++) {
int *msgidp = malloc(sizeof(*msgidp));
*msgidp = i;
rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i",
__FILE__, __FUNCTION__, i);

rkmessages[i].payload = rd_strdup(msg);
rkmessages[i].len = strlen(msg);
rkmessages[i]._private = msgidp;
rkmessages[i].partition = jitter(0, topic_num_partitions - 1);
rkpartition_counts[rkmessages[i].partition]++;
}

r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_PARTITION | RD_KAFKA_MSG_F_FREE,
rkmessages, msgcnt);

/* Scan through messages to check for errors. */
for (i = 0 ; i < msgcnt ; i++) {
if (rkmessages[i].err) {
failcnt++;
if (failcnt < 100)
TEST_SAY("Message #%i failed: %s\n",
i,
rd_kafka_err2str(rkmessages[i].err));
}
}

/* All messages should've been produced. */
if (r < msgcnt) {
TEST_SAY("Not all messages were accepted "
"by produce_batch(): %i < %i\n", r, msgcnt);
if (msgcnt - r != failcnt)
TEST_SAY("Discrepency between failed messages (%i) "
"and return value %i (%i - %i)\n",
failcnt, msgcnt - r, msgcnt, r);
TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt);
}

free(rkmessages);
TEST_SAY("Per-message partition: "
"Produced %i messages, waiting for deliveries\n", r);

msgcounter = msgcnt;
/* Wait for messages to be delivered */
test_wait_delivery(rk, &msgcounter);

if (msgcounter != 0)
TEST_FAIL("Still waiting for %i/%i messages\n",
msgcounter, msgcnt);

for (i = 0; i < topic_num_partitions; i++) {
if (dr_partition_count[i] != rkpartition_counts[i]) {
TEST_FAIL("messages were not sent to designated partitions"
"expected messages %i in partition %i, but only "
"%i messages were sent", rkpartition_counts[i],
i, dr_partition_count[i]);
}
}

free(rkpartition_counts);
free(dr_partition_count);

/* Destroy topic */
rd_kafka_topic_destroy(rkt);

/* Destroy rdkafka instance */
TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
rd_kafka_destroy(rk);

return;
}

static void dr_partitioner_wo_per_message_flag_cb (rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR)
TEST_FAIL("Message delivery failed: %s\n",
rd_kafka_err2str(rkmessage->err));

if (msgcounter <= 0)
TEST_FAIL("Too many message dr_cb callback calls "
"(at msg offset #%lld)\n", rkmessage->offset);
if (rkmessage->partition != msg_partition_wo_flag)
msg_partition_wo_flag_success = 1;
msgcounter--;
}

/* Produce a batch of messages using partitioner without per message partition flag */
static void test_message_partitioner_wo_per_message_flag (void) {
int partition = RD_KAFKA_PARTITION_UA;
int r;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char msg[128];
int msgcnt = 1000;
int failcnt = 0;
int i;
rd_kafka_message_t *rkmessages;

test_conf_init(&conf, &topic_conf, 30);

/* Set delivery report callback */
rd_kafka_conf_set_dr_msg_cb(conf, dr_partitioner_wo_per_message_flag_cb);

/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

TEST_SAY("test_partitioner: Created kafka instance %s\n",
rd_kafka_name(rk));

rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0),
topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
rd_strerror(errno));

/* Create messages */
rkmessages = calloc(sizeof(*rkmessages), msgcnt);
for (i = 0 ; i < msgcnt ; i++) {
int *msgidp = malloc(sizeof(*msgidp));
*msgidp = i;
rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i",
__FILE__, __FUNCTION__, i);

rkmessages[i].payload = rd_strdup(msg);
rkmessages[i].len = strlen(msg);
rkmessages[i]._private = msgidp;
rkmessages[i].partition = msg_partition_wo_flag;
}

r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE,
rkmessages, msgcnt);

/* Scan through messages to check for errors. */
for (i = 0 ; i < msgcnt ; i++) {
if (rkmessages[i].err) {
failcnt++;
if (failcnt < 100)
TEST_SAY("Message #%i failed: %s\n",
i,
rd_kafka_err2str(rkmessages[i].err));
}
}

/* All messages should've been produced. */
if (r < msgcnt) {
TEST_SAY("Not all messages were accepted "
"by produce_batch(): %i < %i\n", r, msgcnt);
if (msgcnt - r != failcnt)
TEST_SAY("Discrepency between failed messages (%i) "
"and return value %i (%i - %i)\n",
failcnt, msgcnt - r, msgcnt, r);
TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt);
}

free(rkmessages);
TEST_SAY("Partitioner: "
"Produced %i messages, waiting for deliveries\n", r);

msgcounter = msgcnt;
/* Wait for messages to be delivered */
test_wait_delivery(rk, &msgcounter);

if (fails)
TEST_FAIL("%i failures, see previous errors", fails);

if (msgcounter != 0)
TEST_FAIL("Still waiting for %i/%i messages\n",
msgcounter, msgcnt);
if (msg_partition_wo_flag_success == 0) {
TEST_FAIL("partitioner was not used, all messages were sent to"
"message specified partition %i", i);
}

/* Destroy topic */
rd_kafka_topic_destroy(rkt);

/* Destroy rdkafka instance */
TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
rd_kafka_destroy(rk);

return;
}


int main_0011_produce_batch (int argc, char **argv) {
test_message_partitioner_wo_per_message_flag();
test_single_partition();
test_partitioner();
return 0;
test_per_message_partition_flag();
return 0;
}

0 comments on commit b8d0b84

Please sign in to comment.