@@ -625,12 +625,20 @@ void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp) {
* Append message at tail of 'rktp' message queue.
*/
void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
int wakeup_fd, queue_len;
int wakeup_fd, queue_len;

rd_kafka_toppar_lock(rktp);

if (!rkm->rkm_u.producer.msgseq &&
rktp->rktp_partition != RD_KAFKA_PARTITION_UA)
rkm->rkm_u.producer.msgseq = ++rktp->rktp_msgseq;
/* No need for enq_sorted(), this is the oldest message. */

queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
wakeup_fd = rktp->rktp_msgq_wakeup_fd;

rd_kafka_toppar_lock(rktp);
wakeup_fd = rktp->rktp_msgq_wakeup_fd;
queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
rd_kafka_toppar_unlock(rktp);

#ifndef _MSC_VER
if (wakeup_fd != -1 && queue_len == 1) {
char one = 1;
@@ -659,15 +667,108 @@ void rd_kafka_toppar_deq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
}

/**
* Inserts all messages from 'rkmq' at head of toppar 'rktp's queue.
* 'rkmq' will be cleared.
* @brief Inserts messages from \p rkmq according to their sorted position
* into the partition xmit queue (i.e., the broker xmit work queue).
*
* @param incr_retry Increment retry count for messages.
*
* @returns the number of messages that could not be retried.
*
* @locality Broker thread
*/
void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
rd_kafka_msgq_t *rkmq) {
rd_kafka_toppar_lock(rktp);
rd_kafka_msgq_concat(rkmq, &rktp->rktp_msgq);
rd_kafka_msgq_move(&rktp->rktp_msgq, rkmq);
rd_kafka_toppar_unlock(rktp);
int rd_kafka_toppar_retry_msgq (rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq,
int incr_retry) {
rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable);
rd_ts_t backoff = rd_clock() + (rk->rk_conf.retry_backoff_ms * 1000);
int max_retries = rk->rk_conf.max_retries;
rd_kafka_msg_t *rkm, *tmp;
rd_kafka_msg_t *first, *last;

/* Scan through messages to see which ones are eligible for retry,
* move the retryable ones to temporary queue and
* set backoff time for first message and optionally
* increase retry count for each message. */
TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
if (rkm->rkm_u.producer.retries + incr_retry > max_retries)
continue;

rd_kafka_msgq_deq(rkmq, rkm, 1);
rd_kafka_msgq_enq(&retryable, rkm);

rkm->rkm_u.producer.ts_backoff = backoff;
rkm->rkm_u.producer.retries += incr_retry;
}

/* No messages are retryable */
if (RD_KAFKA_MSGQ_EMPTY(&retryable))
return 0;

first = TAILQ_FIRST(&retryable.rkmq_msgs);
last = TAILQ_LAST(&retryable.rkmq_msgs, rd_kafka_msgs_head_s);

rd_kafka_toppar_lock(rktp);

/*
* Try to optimize insertion of retryable list.
*/
if (unlikely(RD_KAFKA_MSGQ_EMPTY(&rktp->rktp_xmit_msgq))) {
/* Partition queue is empty, simply move the retryable. */
rd_kafka_msgq_move(&rktp->rktp_xmit_msgq, &retryable);

} else {
/* See if we can optimize the insertion by bulk-loading
* the messages in place/
* We know that:
* - rktp_xmit_msgq is sorted
* - retryable is sorted
* - there is no overlap between the two.
*/
rd_kafka_msg_t *part_first;

part_first = TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs);

if (first->rkm_u.producer.msgseq <
part_first->rkm_u.producer.msgseq) {
/* Prepend input to partition queue.
* First append existing rktp queue to input queue,
* then move input queue to now-empty rktp queue,
* effectively prepending input queue to rktp queue. */
rd_kafka_msgq_concat(&retryable, &rktp->rktp_xmit_msgq);
rd_kafka_msgq_move(&rktp->rktp_xmit_msgq, &retryable);

} else if (first->rkm_u.producer.msgseq >
TAILQ_LAST(&rktp->rktp_xmit_msgq.rkmq_msgs,
rd_kafka_msgs_head_s)->
rkm_u.producer.msgseq) {
/* Append input to partition queue */
rd_kafka_msgq_concat(&rktp->rktp_xmit_msgq, &retryable);

} else {
/* Input queue messages reside somewhere
* in the partition queue range. Find where to add it. */
rd_kafka_msg_t *at;

at = rd_kafka_msgq_find_msgseq_pos(&rktp->
rktp_xmit_msgq,
first->rkm_u.
producer.msgseq);
rd_assert(at);

/* Insert input queue after 'at' position.
* We know that:
* - at is non-NULL
* - at is not the last element. */
TAILQ_INSERT_LIST(&rktp->rktp_xmit_msgq.rkmq_msgs,
at, &retryable.rkmq_msgs,
rd_kafka_msgs_head_s,
rd_kafka_msg_t *, rkm_link);
}
}

rd_kafka_toppar_unlock(rktp);

return 1;
}


@@ -84,7 +84,7 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */

//LOCK: toppar_lock. Should move the lock inside the msgq instead
//LOCK: toppar_lock. toppar_insert_msg(), concat_msgq()
//LOCK: toppar_lock. toppar_enq_msg(), deq_msg(), insert_msgq()
//LOCK: toppar_lock. toppar_enq_msg(), deq_msg(), toppar_retry_msgq()
int rktp_msgq_wakeup_fd; /* Wake-up fd */
rd_kafka_msgq_t rktp_msgq; /* application->rdkafka queue.
* protected by rktp_lock */
@@ -98,6 +98,16 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
* Broker thread -> App */
rd_kafka_q_t *rktp_ops; /* * -> Main thread */

uint64_t rktp_msgseq; /* Current message sequence number.
* Each message enqueued on a
* non-UA partition will get a
* unique sequencial number assigned.
* This number is used to
* re-enqueue the message
* on resends but making sure
* the input ordering is still
* maintained.
* Starts at 1. */

/**
* rktp version barriers
@@ -322,8 +332,9 @@ void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp,
void rd_kafka_toppar_insert_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
void rd_kafka_toppar_deq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm);
void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
rd_kafka_msgq_t *rkmq);
int rd_kafka_toppar_retry_msgq (rd_kafka_toppar_t *rktp,
rd_kafka_msgq_t *rkmq,
int incr_retry);
void rd_kafka_toppar_concat_msgq (rd_kafka_toppar_t *rktp,
rd_kafka_msgq_t *rkmq);
void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp,
@@ -122,9 +122,13 @@ int rd_kafka_err_action (rd_kafka_broker_t *rkb,
actions |= RD_KAFKA_ERR_ACTION_REFRESH;
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE:
/* Client-side wait-response/in-queue timeout */
case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
/* Broker-side request handling timeout */
case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS:
case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND:
/* Temporary broker-side problem */
case RD_KAFKA_RESP_ERR__TRANSPORT:
/* Broker connection down */
actions |= RD_KAFKA_ERR_ACTION_RETRY;
@@ -1793,6 +1797,7 @@ static void rd_kafka_handle_Produce (rd_kafka_t *rk,
} else {
/* Error */
int actions;
char actstr[64];

if (err == RD_KAFKA_RESP_ERR__DESTROY)
goto done; /* Terminating */
@@ -1806,55 +1811,98 @@ static void rd_kafka_handle_Produce (rd_kafka_t *rk,
RD_KAFKA_ERR_ACTION_REFRESH,
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,

RD_KAFKA_ERR_ACTION_RETRY,
RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,

RD_KAFKA_ERR_ACTION_RETRY,
RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,

RD_KAFKA_ERR_ACTION_RETRY,
RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,

RD_KAFKA_ERR_ACTION_RETRY,
RD_KAFKA_RESP_ERR__TIMED_OUT,

RD_KAFKA_ERR_ACTION_PERMANENT,
RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,

RD_KAFKA_ERR_ACTION_END);

rd_rkb_dbg(rkb, MSG, "MSGSET",
"%s [%"PRId32"]: MessageSet with %i message(s) "
"encountered error: %s (actions 0x%x)",
"encountered error: %s (actions %s)",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rd_atomic32_get(&request->rkbuf_msgq.rkmq_msg_cnt),
rd_kafka_err2str(err), actions);

/* NOTE: REFRESH implies a later retry, which does NOT affect
* the retry count since refresh-errors are considered
* to be stale metadata rather than temporary errors.
*
* This is somewhat problematic since it may cause
* duplicate messages even with retries=0 if the
* ProduceRequest made it to the broker but only the
* response was lost due to network connectivity issues.
* That problem will be sorted when EoS is implemented.
*/
if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
/* Request metadata information update */
rd_kafka_toppar_leader_unavailable(rktp,
"produce", err);

/* Move messages (in the rkbuf) back to the partition's
* queue head. They will be resent when a new leader
* is delegated. */
rd_kafka_toppar_insert_msgq(rktp, &request->rkbuf_msgq);

/* No need for fallthru here since the request
* no longer has any messages associated with it. */
goto done;
}
rd_kafka_err2str(err),
rd_flags2str(actstr, sizeof(actstr),
rd_kafka_actions_descs,
actions));

if ((actions & RD_KAFKA_ERR_ACTION_RETRY) &&
rd_kafka_buf_retry(rkb, request))
return; /* Scheduled for retry */

if (actions & (RD_KAFKA_ERR_ACTION_REFRESH |
RD_KAFKA_ERR_ACTION_RETRY)) {
/* Retry */
int incr_retry = 1; /* Increase per-message retry cnt */

if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
/* Request metadata information update.
* These errors imply that we have stale
* information and that the message was
* either rejected or not sent -
* we don't need to increment the retry count
* when we perform a retry since:
* - it is a temporary error (hopefully)
* - there is no change of duplicate delivery
*/
rd_kafka_toppar_leader_unavailable(
rktp, "produce", err);

/* We can't be certain the request wasn't
* sent in case of transport failure,
* so the ERR__TRANSPORT case will need
* the retry count to be increased */
if (err != RD_KAFKA_RESP_ERR__TRANSPORT)
incr_retry = 0;
}

/* If message timed in queue, not in transit,
* we will retry at a later time but not increment
* the retry count since there is no risk
* of duplicates. */
if (err == RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE)
incr_retry = 0;

/* Since requests are specific to a broker
* we move the retryable messages from the request
* back to the partition queue (prepend) and then
* let the new broker construct a new request.
* While doing this we also make sure the retry count
* for each message is honoured, any messages that
* would exceeded the retry count will not be
* moved but instead fail below. */
rd_kafka_toppar_retry_msgq(rktp, &request->rkbuf_msgq,
incr_retry);

if (rd_kafka_msgq_len(&request->rkbuf_msgq) == 0) {
/* No need do anything more with the request
* here since the request no longer has any
messages associated with it. */
goto done;
}
}

/* Translate request-level timeout error code
* to message-level timeout error code. */
if (err == RD_KAFKA_RESP_ERR__TIMED_OUT)
if (err == RD_KAFKA_RESP_ERR__TIMED_OUT ||
err == RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE)
err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;

/* Fatal errors: no message transmission retries */
/* FALLTHRU */
}

/* Propagate assigned offset and timestamp back to app. */
if (likely(offset != RD_KAFKA_OFFSET_INVALID)) {
if (likely(!err && offset != RD_KAFKA_OFFSET_INVALID)) {
rd_kafka_msg_t *rkm;
if (rktp->rktp_rkt->rkt_conf.produce_offset_report) {
/* produce.offset.report: each message */
@@ -243,6 +243,22 @@
} while (0)
#endif

/* @brief Insert \p shead after element \p listelm in \p dhead */
#define TAILQ_INSERT_LIST(dhead,listelm,shead,headname,elmtype,field) do { \
if (TAILQ_LAST(dhead, headname) == listelm) { \
TAILQ_CONCAT(dhead, shead, field); \
} else { \
elmtype _elm = TAILQ_FIRST(shead); \
elmtype _last = TAILQ_LAST(shead, headname); \
elmtype _aft = TAILQ_NEXT(listelm, field); \
(listelm)->field.tqe_next = _elm; \
_elm->field.tqe_prev = &(listelm)->field.tqe_next; \
_last->field.tqe_next = _aft; \
_aft->field.tqe_prev = &_last->field.tqe_next; \
TAILQ_INIT((shead)); \
} \
} while (0)

#ifndef SIMPLEQ_HEAD
#define SIMPLEQ_HEAD(name, type) \
struct name { \
@@ -75,10 +75,10 @@ static int ctrl_thrd_main (void *arg) {
ctrl.cmd.ack = 1;
printf(_C_CYA "## %s: sockem: "
"receieved command to set delay "
"to %d in %lldms\n" _C_CLR,
"to %d in %dms\n" _C_CLR,
__FILE__,
ctrl.next.delay,
(ctrl.next.ts_at - test_clock()) / 1000);
(int)(ctrl.next.ts_at - test_clock()) / 1000);

}