Skip to content

Commit

Permalink
Formalise and fix Producer retries and retry-ordering (#623, #1092, #…
Browse files Browse the repository at this point in the history
…1432, #1476, #1421)

ProduceRequest retries are reworked to not retry the request itself,
but put the messages back on the partition queue (while maintaining
input order) and then have an upcoming ProduceRequest include the messages again.

Retries are now calculated per message rather than ProduceRequest
and the retry backoff is also enforced on a per-message basis.

The input order of messages is retained during this whole process,
which should guarantee ordered delivery if max.in.flight=1 but with retries > 0.

The new behaviour is formalised through documentation (INTRODUCTION.md)
  • Loading branch information
edenhill committed Dec 12, 2017
1 parent 4a046fb commit 4e0c961
Show file tree
Hide file tree
Showing 8 changed files with 377 additions and 53 deletions.
107 changes: 104 additions & 3 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ for message commit acknowledgements from brokers (any value but 0, see
[`CONFIGURATION.md`](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
for specifics) then librdkafka will hold on to the message until
all expected acks have been received, gracefully handling the following events:

* Broker connection failure
* Topic leader change
* Produce errors signaled by the broker
Expand All @@ -160,9 +160,110 @@ to report the status of message delivery:

See Producer API chapter for more details on delivery report callback usage.

The delivery report callback is optional.
The delivery report callback is optional but highly recommended.


### Producer message delivery success

When a ProduceRequest is successfully handled by the broker and a
ProduceResponse is received (also called the ack) without an error code
the messages from the ProduceRequest are enqueued on the delivery report
queue (if a delivery report callback has been set) and will be passed to
the application on the next invocation rd_kafka_poll().


### Producer message delivery failure

The following sub-chapters explains how different produce errors
are handled.


#### Error: Timed out in transmission queue

Internal error ERR__TIMED_OUT_QUEUE.

The connectivity to the broker may be stalled due to networking contention,
local or remote system issues, etc, and the request has not yet been sent.

The producer can be certain that the message has not been sent to the broker.

This is a retryable error, but is not counted as a retry attempt
since the message was never actually transmitted.

A retry at this point will not cause duplicate messages.


#### Error: Timed out in flight to/from broker

Internal error ERR__TIMED_OUT, ERR__TRANSPORT.

Same reasons as for "Timed out in transmission queue" above, with the
difference that the message may have been sent to the broker and might
be stalling waiting for broker replicas to ack the message, or the response
could be stalled due to networking issues.
At this point the producer can't know if the message reached the broker,
nor if the broker wrote the message to disk and replicas.

This is a retryable error.

A retry at this point may cause duplicate messages.


#### Error: Temporary broker-side error

Broker errors ERR_REQUEST_TIMED_OUT, ERR_NOT_ENOUGH_REPLICAS,
ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND.

These errors are considered temporary and a retry is warranted.


#### Error: Temporary errors due to stale metadata

Broker errors ERR_LEADER_NOT_AVAILABLE, ERR_NOT_LEADER_FOR_PARTITION.

These errors are considered temporary and a retry is warranted, a metadata
request is automatically sent to find a new leader for the partition.

A retry at this point will not cause duplicate messages.


#### Error: Local time out

Internal error ERR__MSG_TIMED_OUT.

The message could not be successfully transmitted before message.timeout.ms
expired, typically due to no leader being available or no broker connection.
The message may have been retried due to other errors but
those error messages are abstracted by the ERR__MSG_TIMED_OUT error code.

Since the message.timeout.ms has passed there will be no more retries.


#### Error: Permanent errors

Any other error is considered a permanent error and the message
will fail immediately, generating a delivery report event with the
distinctive error code.



### Producer retries

The ProduceRequest itself is not retried, instead the messages
are put back on the internal partition queue by an insert sort
that maintains their original position (the message order is defined
at the time a message is initially appended to a partition queue, i.e., after
partitioning).
A backoff time (retry.backoff.ms) is set on the retried messages which
effectively blocks retry attempts until the backoff time has expired.


### Reordering

As for all retries, if max.in.flight > 1 and retries > 0, messages may be
produced out of order, since a sub-sequent message in a sub-sequent
ProduceRequest may already be in-flight (and accepted by the broker)
by the time the retry for the failing message is sent.



Expand All @@ -173,7 +274,7 @@ The delivery report callback is optional.

The librdkafka API is documented in the
[`rdkafka.h`](https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h)
header file, the configuration properties are documented in
header file, the configuration properties are documented in
[`CONFIGURATION.md`](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)

### Initialization
Expand Down
22 changes: 13 additions & 9 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -530,11 +530,11 @@ static void rd_kafka_broker_timeout_scan (rd_kafka_broker_t *rkb, rd_ts_t now) {
/* Requests in retry queue */
retry_cnt = rd_kafka_broker_bufq_timeout_scan(
rkb, 0, &rkb->rkb_retrybufs, NULL,
RD_KAFKA_RESP_ERR__TIMED_OUT, now);
RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, now);
/* Requests in local queue not sent yet. */
q_cnt = rd_kafka_broker_bufq_timeout_scan(
rkb, 0, &rkb->rkb_outbufs, &req_cnt,
RD_KAFKA_RESP_ERR__TIMED_OUT, now);
RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, now);

if (req_cnt + retry_cnt + q_cnt > 0) {
rd_rkb_dbg(rkb, MSG|RD_KAFKA_DBG_BROKER,
Expand Down Expand Up @@ -2372,17 +2372,21 @@ static int rd_kafka_toppar_producer_serve (rd_kafka_broker_t *rkb,
* our waiting to queue.buffering.max.ms
* and batch.num.messages. */
if (r < rkb->rkb_rk->rk_conf.batch_num_messages) {
rd_kafka_msg_t *rkm_oldest;
rd_kafka_msg_t *rkm;
rd_ts_t wait_max;

rkm_oldest = TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs);
if (unlikely(!rkm_oldest))
return 0;
rkm = TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs);
rd_assert(rkm != NULL);

/* Calculate maximum wait-time to
* honour queue.buffering.max.ms contract. */
wait_max = rd_kafka_msg_enq_time(rkm_oldest) +
/* Calculate maximum wait-time to honour both
* queue.buffering.max.ms and retry.backoff.ms contracts. */
wait_max = rd_kafka_msg_enq_time(rkm) +
(rkb->rkb_rk->rk_conf.buffering_max_ms * 1000);

if (unlikely(rkm->rkm_u.producer.ts_backoff > now &&
rkm->rkm_u.producer.ts_backoff < wait_max))
wait_max = rkm->rkm_u.producer.ts_backoff;

if (wait_max > now) {
if (wait_max < *next_wakeup)
*next_wakeup = wait_max;
Expand Down
18 changes: 18 additions & 0 deletions src/rdkafka_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,25 @@ int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
return rd_atomic32_get(&timedout->rkmq_msg_cnt) - cnt;
}

/**
* @brief Find the insert position (i.e., the previous element)
* for message sequence \p msgseq.
*
* @remark This needs to be true: rkmq.first < msgseq < rkmq.last
*/
rd_kafka_msg_t *rd_kafka_msgq_find_msgseq_pos (rd_kafka_msgq_t *rkmq,
uint64_t msgseq) {
rd_kafka_msg_t *rkm, *last = NULL;

TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
if (rkm->rkm_u.producer.msgseq > msgseq)
return last;
last = rkm;
}

rd_assert(!*"msgseq outside rkmq window");
return NULL; /* NOTREACHED */
}



Expand Down
36 changes: 35 additions & 1 deletion src/rdkafka_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ typedef struct rd_kafka_msg_s {
struct {
rd_ts_t ts_timeout; /* Message timeout */
rd_ts_t ts_enq; /* Enqueue/Produce time */
rd_ts_t ts_backoff; /* Backoff next Produce until
* this time. */
uint64_t msgseq; /* Message sequence number,
* used to maintain ordering. */
int retries; /* Number of retries so far */
} producer;
#define rkm_ts_timeout rkm_u.producer.ts_timeout
#define rkm_ts_enq rkm_u.producer.ts_enq
Expand Down Expand Up @@ -131,8 +136,9 @@ rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) {



TAILQ_HEAD(rd_kafka_msgs_head_s, rd_kafka_msg_s);
typedef struct rd_kafka_msgq_s {
TAILQ_HEAD(, rd_kafka_msg_s) rkmq_msgs;
struct rd_kafka_msgs_head_s rkmq_msgs; /* TAILQ_HEAD */
rd_atomic32_t rkmq_msg_cnt;
rd_atomic64_t rkmq_msg_bytes;
} rd_kafka_msgq_t;
Expand All @@ -143,6 +149,9 @@ typedef struct rd_kafka_msgq_s {
#define RD_KAFKA_MSGQ_FOREACH(elm,head) \
TAILQ_FOREACH(elm, &(head)->rkmq_msgs, rkm_link)

/* @brief Check if queue is empty. Proper locks must be held. */
#define RD_KAFKA_MSGQ_EMPTY(rkmq) TAILQ_EMPTY(&(rkmq)->rkmq_msgs)

/**
* Returns the number of messages in the specified queue.
*/
Expand Down Expand Up @@ -247,6 +256,29 @@ rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) {
return rkm;
}


static RD_INLINE
int rd_kafka_msg_cmp_msgseq (const void *_a, const void *_b) {
const rd_kafka_msg_t *a = _a, *b = _b;

return a->rkm_u.producer.msgseq - b->rkm_u.producer.msgseq;
}

/**
* @brief Insert message at its sorted position using the msgseq.
* @remark This is an O(n) operation.
* @warning The message must have a msgseq set.
*/
static RD_INLINE RD_UNUSED
void rd_kafka_msgq_enq_sorted (rd_kafka_msgq_t *rkmq,
rd_kafka_msg_t *rkm) {
rd_dassert(rkm->rkm_u.producer.msgseq != 0);
TAILQ_INSERT_SORTED(&rkmq->rkmq_msgs, rkm, rd_kafka_msg_t *,
rkm_link, rd_kafka_msg_cmp_msgseq);
rd_atomic32_add(&rkmq->rkmq_msg_cnt, 1);
rd_atomic64_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len+rkm->rkm_key_len);
}

/**
* Insert message at head of message queue.
*/
Expand Down Expand Up @@ -278,6 +310,8 @@ int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
rd_kafka_msgq_t *timedout,
rd_ts_t now);

rd_kafka_msg_t *rd_kafka_msgq_find_msgseq_pos (rd_kafka_msgq_t *rkmq,
uint64_t msgseq);

int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
int do_lock);
Expand Down
113 changes: 105 additions & 8 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,10 @@ void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp) {
void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {

rd_kafka_toppar_lock(rktp);
if (!rkm->rkm_u.producer.msgseq &&
rktp->rktp_partition != RD_KAFKA_PARTITION_UA)
rkm->rkm_u.producer.msgseq = ++rktp->rktp_msgseq;
/* No need for enq_sorted(), this is the oldest message. */
rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
#ifndef _MSC_VER
if (rktp->rktp_msgq_wakeup_fd != -1 &&
Expand Down Expand Up @@ -648,15 +652,108 @@ void rd_kafka_toppar_deq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
}

/**
* Inserts all messages from 'rkmq' at head of toppar 'rktp's queue.
* 'rkmq' will be cleared.
* @brief Inserts messages from \p rkmq according to their sorted position
* into the partition xmit queue (i.e., the broker xmit work queue).
*
* @param incr_retry Increment retry count for messages.
*
* @returns the number of messages that could not be retried.
*
* @locality Broker thread
*/
void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
rd_kafka_msgq_t *rkmq) {
rd_kafka_toppar_lock(rktp);
rd_kafka_msgq_concat(rkmq, &rktp->rktp_msgq);
rd_kafka_msgq_move(&rktp->rktp_msgq, rkmq);
rd_kafka_toppar_unlock(rktp);
int rd_kafka_toppar_retry_msgq (rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq,
int incr_retry) {
rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable);
rd_ts_t backoff = rd_clock() + (rk->rk_conf.retry_backoff_ms * 1000);
int max_retries = rk->rk_conf.max_retries;
rd_kafka_msg_t *rkm, *tmp;
rd_kafka_msg_t *first, *last;

/* Scan through messages to see which ones are eligible for retry,
* move the retryable ones to temporary queue and
* set backoff time for first message and optionally
* increase retry count for each message. */
TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
if (rkm->rkm_u.producer.retries + incr_retry > max_retries)
continue;

rd_kafka_msgq_deq(rkmq, rkm, 1);
rd_kafka_msgq_enq(&retryable, rkm);

rkm->rkm_u.producer.ts_backoff = backoff;
rkm->rkm_u.producer.retries += incr_retry;
}

/* No messages are retryable */
if (RD_KAFKA_MSGQ_EMPTY(&retryable))
return 0;

first = TAILQ_FIRST(&retryable.rkmq_msgs);
last = TAILQ_LAST(&retryable.rkmq_msgs, rd_kafka_msgs_head_s);

rd_kafka_toppar_lock(rktp);

/*
* Try to optimize insertion of retryable list.
*/
if (unlikely(RD_KAFKA_MSGQ_EMPTY(&rktp->rktp_xmit_msgq))) {
/* Partition queue is empty, simply move the retryable. */
rd_kafka_msgq_move(&rktp->rktp_xmit_msgq, &retryable);

} else {
/* See if we can optimize the insertion by bulk-loading
* the messages in place/
* We know that:
* - rktp_xmit_msgq is sorted
* - retryable is sorted
* - there is no overlap between the two.
*/
rd_kafka_msg_t *part_first;

part_first = TAILQ_FIRST(&rktp->rktp_xmit_msgq.rkmq_msgs);

if (first->rkm_u.producer.msgseq <
part_first->rkm_u.producer.msgseq) {
/* Prepend input to partition queue.
* First append existing rktp queue to input queue,
* then move input queue to now-empty rktp queue,
* effectively prepending input queue to rktp queue. */
rd_kafka_msgq_concat(&retryable, &rktp->rktp_xmit_msgq);
rd_kafka_msgq_move(&rktp->rktp_xmit_msgq, &retryable);

} else if (first->rkm_u.producer.msgseq >
TAILQ_LAST(&rktp->rktp_xmit_msgq.rkmq_msgs,
rd_kafka_msgs_head_s)->
rkm_u.producer.msgseq) {
/* Append input to partition queue */
rd_kafka_msgq_concat(&rktp->rktp_xmit_msgq, &retryable);

} else {
/* Input queue messages reside somewhere
* in the partition queue range. Find where to add it. */
rd_kafka_msg_t *at;

at = rd_kafka_msgq_find_msgseq_pos(&rktp->
rktp_xmit_msgq,
first->rkm_u.
producer.msgseq);
rd_assert(at);

/* Insert input queue after 'at' position.
* We know that:
* - at is non-NULL
* - at is not the last element. */
TAILQ_INSERT_LIST(&rktp->rktp_xmit_msgq.rkmq_msgs,
at, &retryable.rkmq_msgs,
rd_kafka_msgs_head_s,
rd_kafka_msg_t *, rkm_link);
}
}

rd_kafka_toppar_unlock(rktp);

return 1;
}


Expand Down

0 comments on commit 4e0c961

Please sign in to comment.