Skip to content

Commit

Permalink
Added op priority to queues (for #1088)
Browse files Browse the repository at this point in the history
(cherry picked from commit 3e4ccaf)
  • Loading branch information
edenhill committed Apr 7, 2017
1 parent c7875d9 commit 9e01c1d
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 11 deletions.
3 changes: 3 additions & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ struct rd_kafka_op_s {
rd_kafka_resp_err_t rko_err;
int32_t rko_len; /* Depends on type, typically the
* message length. */
int rko_prio; /* In-queue priority.
* Higher value means higher prio. */

shptr_rd_kafka_toppar_t *rko_rktp;

Expand Down Expand Up @@ -281,6 +283,7 @@ rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk,
int rd_kafka_op_reply (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);


#define rd_kafka_op_set_prio(rko,prio) ((rko)->rko_prio = prio)


#define rd_kafka_op_err(rk,err,...) do { \
Expand Down
19 changes: 11 additions & 8 deletions src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,18 +211,21 @@ int rd_kafka_q_move_cnt (rd_kafka_q_t *dstq, rd_kafka_q_t *srcq,
* items of 'srcq' we can move the entire queue. */
if (cnt == -1 ||
cnt >= (int)srcq->rkq_qlen) {
rd_dassert(TAILQ_EMPTY(&srcq->rkq_q) ||
srcq->rkq_qlen > 0);
TAILQ_CONCAT(&dstq->rkq_q, &srcq->rkq_q, rko_link);
mcnt = srcq->rkq_qlen;
dstq->rkq_qlen += srcq->rkq_qlen;
dstq->rkq_qsize += srcq->rkq_qsize;
rd_kafka_q_reset(srcq);
mcnt = srcq->rkq_qlen;
rd_kafka_q_concat0(dstq, srcq, 0/*no-lock*/);
} else {
while (mcnt < cnt &&
(rko = TAILQ_FIRST(&srcq->rkq_q))) {
TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link);
TAILQ_INSERT_TAIL(&dstq->rkq_q, rko, rko_link);
if (likely(!rko->rko_prio))
TAILQ_INSERT_TAIL(&dstq->rkq_q, rko,
rko_link);
else
TAILQ_INSERT_SORTED(
&dstq->rkq_q, rko,
rd_kafka_op_t *, rko_link,
rd_kafka_op_cmp_prio);

srcq->rkq_qlen--;
dstq->rkq_qlen++;
srcq->rkq_qsize -= rko->rko_len;
Expand Down
36 changes: 33 additions & 3 deletions src/rdkafka_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@ void rd_kafka_q_io_event (rd_kafka_q_t *rkq) {
}


/**
* @brief rko->rko_prio comparator
* @remark: descending order: higher priority takes preceedence.
*/
static RD_INLINE RD_UNUSED
int rd_kafka_op_cmp_prio (const void *_a, const void *_b) {
const rd_kafka_op_t *a = _a, *b = _b;

return b->rko_prio - a->rko_prio;
}

/**
* @brief Enqueue the 'rko' op at the tail of the queue 'rkq'.
*
Expand Down Expand Up @@ -273,7 +284,11 @@ int rd_kafka_q_enq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
}

if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
TAILQ_INSERT_TAIL(&rkq->rkq_q, rko, rko_link);
if (likely(!rko->rko_prio))
TAILQ_INSERT_TAIL(&rkq->rkq_q, rko, rko_link);
else
TAILQ_INSERT_SORTED(&rkq->rkq_q, rko, rd_kafka_op_t *,
rko_link, rd_kafka_op_cmp_prio);
rkq->rkq_qlen++;
rkq->rkq_qsize += rko->rko_len;
cnd_signal(&rkq->rkq_cond);
Expand Down Expand Up @@ -322,13 +337,24 @@ int rd_kafka_q_concat0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) {
if (do_lock)
mtx_lock(&rkq->rkq_lock);
if (!rkq->rkq_fwdq && !srcq->rkq_fwdq) {
rd_kafka_op_t *rko;

rd_dassert(TAILQ_EMPTY(&srcq->rkq_q) ||
srcq->rkq_qlen > 0);
if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) {
if (do_lock)
mtx_unlock(&rkq->rkq_lock);
return -1;
}
/* First insert any prioritized ops from srcq
* in the right position in rkq. */
while ((rko = TAILQ_FIRST(&srcq->rkq_q)) && rko->rko_prio > 0) {
TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link);
TAILQ_INSERT_SORTED(&rkq->rkq_q, rko,
rd_kafka_op_t *, rko_link,
rd_kafka_op_cmp_prio);
}

TAILQ_CONCAT(&rkq->rkq_q, &srcq->rkq_q, rko_link);
if (rkq->rkq_qlen == 0 && srcq->rkq_qlen > 0)
rd_kafka_q_io_event(rkq);
Expand All @@ -351,18 +377,22 @@ int rd_kafka_q_concat0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) {


/**
* Prepend all elements of 'srcq' onto head of 'rkq'.
* @brief Prepend all elements of 'srcq' onto head of 'rkq'.
* 'rkq' will be be locked (if 'do_lock'==1), but 'srcq' will not.
* 'srcq' will be reset.
*
* Locality: any thread.
* @remark Will not respect priority of ops, srcq will be prepended in its
* original form to rkq.
*
* @locality any thread.
*/
static RD_INLINE RD_UNUSED
void rd_kafka_q_prepend0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq,
int do_lock) {
if (do_lock)
mtx_lock(&rkq->rkq_lock);
if (!rkq->rkq_fwdq && !srcq->rkq_fwdq) {
/* FIXME: prio-aware */
/* Concat rkq on srcq */
TAILQ_CONCAT(&srcq->rkq_q, &rkq->rkq_q, rko_link);
/* Move srcq to rkq */
Expand Down

0 comments on commit 9e01c1d

Please sign in to comment.