Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4089: KafkaProducer expires batch when metadata is stale #1791

Closed
wants to merge 19 commits into from

Conversation

sutambe
Copy link
Contributor

@sutambe sutambe commented Aug 26, 2016

No description provided.

* The metadata has expired if an update is explicitly requested or an update is due now.
*/
public synchronized boolean hasExpired(long nowMs) {
return this.needUpdate || this.timeToNextUpdate(nowMs) == 0;
Copy link
Member

@lindong28 lindong28 Aug 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the body of this method is equivalent to the following, which is already used in the method timeToNextUpdate(..). This kind of duplicates the same logic in two methods.

long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
return timeToExpire == 0;

@becketqin
Copy link
Contributor

ping @junrao . Will you able to take a look?

Some context on the patch. The current expiration mechanism is based on the muted partitions in the RecordAccumulator, if a batch has sit in the accumulator for some time and there is no in flight batch for that partition, we will expire the batch. This behavior is introduced in KAFKA-3388( #1056 ). The issue is that when metadata update happens, we will not send any other request in that send loop, so no partition will be muted and thus are exposed to expiration in that send loop even though they can still make progress. This caused our mirror maker to shutdown from time to time. The fix is to let the expiration check base on the metadata freshness. If the metadata is fresh and the leader is known, we don't expire a batch.

Copy link
Contributor

@jjkoshy jjkoshy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I think this change makes sense after a discussion with @becketqin @MayureshGharat @sutambe

It is interesting to summarize the evolution of the effect of request timeout on batches in the accumulator. Let me know if you see any errors. cc others who were involved in those patches @junrao @hachikuji @ijuma

  • KIP-19: if partition metadata is missing, then can expire if batch has been ready for longer than request timeout
  • KAFKA-2805: removed the check on missing metadata. i.e., can expire any batch that has been ready for longer than request timeout. (This was to handle the case of a cluster being down and thus no metadata refresh)
  • KAFKA-3388: Expire only if we think we cannot send; and we infer that we can send based on fact that there is an in-flight request that has not yet timed out.
  • KAFKA-4089: Now adds the additional check that metadata isn't too old/stale before expiring: i.e., metadata exists and is within metadata-max-age + (request timeout + backoff) * 3

I left a couple of minor comments in the patch and may have a few more as I'd like to do another pass. In the mean time can you address these and think about updating our html docs? I think the request timeout documentation is woefully inadequate especially with these nuances at play. It may also be reasonable to add a one-liner to the original KIP-19 wiki stating that there have been a few changes to its interpretation and the user should refer to the current html docs as source of truth. In fact there are a few statements in that KIP that I find are no longer true (even prior to this patch).

@@ -47,6 +47,8 @@

public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
private static final long DEFAULT_RETRY_BACKOFF_MS = 100L;
private static final long DEFAULT_METADATA_MAX_AGE_MS = 60 * 60 * 1000L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-existing, but this is slightly inconsistent with the max age default in the producer config (which is five minutes). One hour seems too high for this.

Copy link
Contributor Author

@sutambe sutambe Sep 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restored DEFAULT_METADATA_MAX_AGE_MS to 5 mins.

@@ -223,17 +223,29 @@ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, C
* Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout
* due to metadata being unavailable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment needs to be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

// (1) the partition does not have a batch in flight.
// (2) Either the metadata is too stale or we don't have a leader for a partition.
//
// The first condition prevents later batches from being expired while an earlier batch is
Copy link
Contributor

@jjkoshy jjkoshy Sep 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current statement makes a statement that seems to apply more generally and only later narrows it down to the strict ordering case.

So I would make this a little clearer by stating at the beginning itself that this applies to max.in.flight=1. So something like:
In the case where the user wishes to achieve strict ordering, (i.e., max.in.flight.request.per.connection=1) the first condition helps ensure that batches also expire in order. (Partitions are only muted if strict ordering is enabled and there are in-flight batches.)

// Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection
// We check if the batch should be expired if we know that we can't make progress on a given
// topic-partition. Specifically, we check if
// (1) the partition does not have a batch in flight.
Copy link
Contributor

@jjkoshy jjkoshy Sep 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the partition does not have an in-flight batch

// is only active in this case. Otherwise the expiration order is not guaranteed.
if (!muted.contains(tp)) {
//
// The second condition allows expiration of lingering batches if we don't have a leader for
Copy link
Contributor

@jjkoshy jjkoshy Sep 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean ready (as opposed to lingering)?

//
// Finally, we expire batches if the last metadata refresh was too long ago. We might run in to
// this situation when the producer is disconnected from all the brokers. Note that stale metadata
// is significantly longer than metadata.max.age.
Copy link
Contributor

@jjkoshy jjkoshy Sep 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Significantly longer" needs to be made more clear/precise - see my overall comment.

@@ -375,6 +382,19 @@ public void wakeup() {
this.client.wakeup();
}

/* metadata becomes "stale" for batch expiry purpose when the time since the last successful update exceeds
* the metadataStaleMs value. This value must be greater than the metadata.max.age and some delta to account
* for a few retries and transient network disconnections. A small number of retries (3) are chosen because
Copy link
Contributor

@jjkoshy jjkoshy Sep 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not terribly thrilled about a hard-coded retry limit here. Let me think a little more on this but I understand that there may not be a much better way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. This can instead be the producer's retry count. If the retry count is zero, then we will have to allow for at least one metadata request past its max age. So the staleness threshold will be: retryCount * (backoff + requestTimeout) + maxAge

@jjkoshy
Copy link
Contributor

jjkoshy commented Sep 22, 2016

Two more points:

  • In the overall comment above, under KAFKA-3388 - that only applies when maxInFlightRequests is one. i.e., if it is greater than one we don't pay attention to whether there is an inflight request or not when expiring messages. In KAFKA-3388 we pay attention to whether there is an inflight request (via the !muted check) to preserve expiration order.
  • I don't think it is necessary to document the nuances under the request.timeout configuration. The details of timeouts in the accumulator are currently hidden from the user and that is probably a reasonable simplification. i.e., the request timeout can be understood as primarily applicable to the timeout when written out on the wire. The fact that it is also used in the accumulator is an internal detail. Our documentation could just be that the producer will timeout requests that are yet to be sent out if it determines that it is unable to make progress.

//
// Finally, we expire batches if the last metadata refresh was too long ago. I.e., > {@link Sender#metadataStaleMs}.
// We might run in to this situation when the producer is disconnected from all the brokers.
if (!muted.contains(tp) && (isMetadataStale || cluster.leaderFor(tp) == null)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using !muted I think it would be much clearer to alias this to a boolean that is named: guaranteeExpirationOrder similar to the guaranteeMessageOrder boolean in Sender

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of minor comments while I review the more interesting parts. :)

* The max allowable age of metadata.
*/
public long maxAge() {
return this.metadataExpireMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using two different names here and elsewhere? It would be good to be consistent. Also, whatever we choose, we generally include the unit at the end (e.g. maxAgeMs).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadata.maxAgeMs makes sense to me. However, metadata.maxMetadataAgeMs sounds redundant.

@@ -334,6 +334,7 @@ private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serial
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
t.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left by mistake?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. zapped.

* the backoff, staleness determination is delayed by that factor.
*/
static long getMetadataStaleMs(long metadataMaxAgeMs, int requestTimeoutMs, long refreshBackoffMs, int retries) {
return metadataMaxAgeMs + Math.max(retries, 1) * (requestTimeoutMs + refreshBackoffMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 1 correct here? The comment says 3 retries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@@ -49,9 +49,11 @@

public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
private static final long DEFAULT_RETRY_BACKOFF_MS = 100L;
private static final long DEFAULT_METADATA_MAX_AGE_MS = 300 * 1000L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to put that in ConsumerConfig so that we can reference the constant when defining METADATA_MAX_AGE_CONFIG in ConsumerConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

boolean stale = isMetadataStale(now, metadata, this.metadataStaleMs);
if (stale || !result.unknownLeaderTopics.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is the logic here correct? Let's say there is a network issue and the producer can't talk to any broker. All leaders are not be empty. It will take at least 300 seconds before the metadata becomes stable. Then some of the records will have to sit in the bufferpool much longer than the default 30 second request timeout before being expired. This seems a big change in behavior.

Also, would it be better to consolidate the check here into accumulator.abortExpiredBatches()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition in Sender.java is a quick check to avoid expensive iteration in abortExpiredBatches when we can. I.e., when metadata is fresh and leaders for all partitions are known there's no reason to drop into the iteration in abortExpiredBatches. We'll not expire any batches in that case.

@junrao
Copy link
Contributor

junrao commented Sep 23, 2016

@becketqin @sutambe : I am not sure that I fully understand the motivation of the change. For the MirrorMaker issue that you mentioned, is it because the updating of metadata request takes long? Couldn't you just configure a larger request timeout for the producer in MirrorMaker?

@jjkoshy
Copy link
Contributor

jjkoshy commented Sep 23, 2016

@junrao @ijuma thanks for taking a look. Initially, I had also wondered about bumping up the request timeout to a much higher value (depending on how many batches are pending in the partition) - it could work, but I think there are shortcomings in the original request timeout proposal (KIP-19), its implementation and follow-ups that are worth summarizing.

First off, with KIP-19 the primary objective there was to add a request timeout to requests that were out on the wire. Without that, sudden broker death (i.e., killed/disk failure, etc.) could cause clients to hang and the accumulator to fill up even with a new leader being available. KIP-19’s scope broadened quite a bit to include the application-level send timeout (max.block.ms) and since we did not want to add separate timeouts for the accumulator phase we felt it would be reasonable to roll the request timeout into the accumulator phase as well. I think we reasoned that everything after the send call returns can be assumed to be “on the wire” from the user’s POV (even though the batch is sitting in the accumulator). However, that is a gross oversimplification of what really goes on in the accumulator phase: for requests that are in the accumulator, the clock should start ticking after the batch becomes ready. However, we added a further condition to timing out ready batches in the accumulator: i.e., we would expire batches only if the leader is unknown (i.e., metadata unavailable) and the batch has been ready for longer than the request timeout. The intent there was really that if we know that we are able to make progress (by virtue of metadata being available) then we should not expire the batch even if it has been sitting in the accumulator longer than the timeout.

That condition did not work well with the scenario of a cluster being down which is why in KAFKA-2805 the metadata check was removed.

… and in KAFKA-3388 we sort of went back to the metadata check (specifically, see #1056 (diff))

Bumping up the request timeout works but is an artificial way to get around this. It would be clearer to have a separate accumulator timeout but that is leaking too much of the producer’s internals to the user IMO and there are still various nuances to consider.

So yes there is a change in behavior but I think we have already been making similar changes in the referenced jiras. So I think its effect is not as significant than it seems. This is especially so because KIP-19 made gross oversimplifications on accumulator timeouts as I described above; it is reasonable to believe that users are for the most part unaware of these nuances. In fact one only has to read the current javadoc for RecordAccumulator.ready to decide that it may be too difficult and not worthwhile to attempt to precisely compute the time from the call to send() to the actual transmission on the wire.


// Test batches not in retry
for (int i = 0; i < appends; i++) {
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
}
// Make the batches ready due to batch full
// subtest1: Make the batches ready due to batch full
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than having one giant test which includes these subtests, could we split them into separate test cases? It's easier to debug failures that way since the test case is more isolated. It also lets you provide a meaningful name to the test which makes the nature of the failure clearer just looking at the output from the build.

@becketqin
Copy link
Contributor

@junrao We can bump up the request timeout to avoid expiration of the batches in the accumulator. But it is really a workaround as Joel said. The following is my take on KIP-19 and the timeouts.

Request timeout is really for serving the purpose of preventing the clients from waiting for a response forever. The reason the clients timeout a request is because they have no idea whether the broker can still make progress. This is something like my laptop does not respond to anything and I don't know what happened, so I reboot it.

Ideally we should have a separate timeout configuration for batches in the accumulator to prevent the batches from sitting in the accumulator forever. If the accumulator is a completely independent module, it would be similar to something like a batch.draining.timeout, i.e. if a batch cannot be drained in some timeout, the accumulator assumes it does not know whether the partition can still make progress or not and expire the batches. However, the difference here is that the accumulator actually has a reasonably good idea about whether the batches can still make progress or not. In that case, expiring the batches naively is an overkill and also bring problems to the users unnecessarily. This is more of a case that I am running some crazy job on my machine and I don't know how long it will take, but as long as it is reporting the progress, I don't reboot my laptop.

Another important reason we do not have a separate timeout config for the batches in the accumulator was that this timeout is not an SLA, i.e. we do not provide guarantee of a strict batch timeout in the accumulator. This is different from request timeout, which is both an SLA and a "panic then reboot". That is a compromise we made in KIP-19 because it is difficult to do and sometimes unreasonable.

So my understanding for KIP-19 is that:

  1. We have a strict request timeout for request on the wire.
  2. We reused this request timeout in the accumulator for "panic and reboot", that is why we added the metadata check in the first place, i.e. "no panic, no reboot".
  3. From users perspective, all the API calls has a strict timeout max.block.ms. After that, there is no strong guarantee of how long it will take for the messages to be actually sent/failed. The produce will try its best to make progress and expire the batch if it cannot.

As of the past changes,

  • The initial patch has an issue that the producer may not panic even if it cannot make progress because it relies on metadata but is not checking if the metadata is stale.
  • KAFKA-2805 removed the metadata check completely. This was not desired because it changes the "panic then reboot" to "always reboot".
  • KAFKA-3388 was trying to fix KAFKA-2805 by adding back the "panic" check by inferring if the producer should "panic" from whether there is a batch in flight. This way of determine whether the producer should panic failed because we may "false panic"
  • This patch tries to fix the way to trigger a panic by going back to relying on metadata, but fix the issue in the original patch where the metadata is not trustworthy anymore.

@jjkoshy
Copy link
Contributor

jjkoshy commented Sep 26, 2016

@junrao - thanks again for taking the time to think through this. I agree with the theme of these comments that we should ideally provide guaranteed timeouts to users. I’m also certain that most users currently have the wrong perception that there is such a thing as a guaranteed timeout in the producer today. The reality is that there are so many if’s and buts in the current code that they will be surprised to learn that we actually do not have a guaranteed timeout from when the send returns to when the callback fires (or future is ready).

I think we are all in agreement that increasing request timeout kind of works. The problem with this approach is that it forces exposing users to the intricacies of where the request timeout is applicable. As I mentioned earlier, I think KIP-19 (which was originally for wire timeouts) became overloaded with other timeouts and the end-state is inadequate. Even in its end state the intent was that if we can make progress then do not timeout batches in the accumulator, but over time the implementation diverged from that intent. The root problem seems to be that we are using request timeout for the time spent in the accumulator as well (and doing a poor job of that to boot). The clock starts ticking when the request is added to the accumulator. However, it is reset when the request is written out on the wire. So the code clearly admits guilt :) - that it is overloading what was intended to be a wire timeout for the accumulator. So in order to accommodate a high volume producer (such as a mirror maker that is catching up) in which batches may spend considerable time in the accumulator, it forces the user to set an artificially high request timeout. Again, this works but at the expense of increasing the time-to-detect a broker failure. Also, reducing the buffer size is precisely the opposite of what should be done in a high-volume producer.

As suggested earlier in this conversation, we could have an explicit accumulator timeout to avoid these hidden effects. However, that exposes users to internals of the producer. @becketqin I think had a better idea - which is to add an explicit callback/future timeout. While I know that many are loathe to adding more timeouts, I think this addresses the problem and gives us the opportunity to actually provide intuitive guarantees to users. So the timeouts would end up being:

  • max.block.ms: maximum time the initial send blocks
  • request.timeout: only applies to the request timeout on the wire
  • send.timeout or callback.timeout: maximum time from the point the send returns to the callback/future. This timeout would be much larger than the request.timeout.

Any thoughts? We can put together a KIP if this sounds reasonable.

@jjkoshy
Copy link
Contributor

jjkoshy commented Sep 26, 2016

btw, this was touched upon in KIP-19 and we wanted to avoid the separate timeout due to its interaction with retries. So if we do this then we would document that send.timeout takes precedence over any remaining retries.

@junrao
Copy link
Contributor

junrao commented Sep 27, 2016

@jjkoshy , @becketqin : Yes, having a separate callback.timeout seems easier to understand than piggybacking on metadata.age.ms.

About the bufferpool size in the producer in MM. I am curious how large you set it too. Batching is definitely the most effective way of getting higher throughput, but setting it larger beyond a certain point won't help any more and only increases the amount of the time a message is in the accumulator. For example, the default 32MB can still allow the producer to batch about 3KB per partition with 10K partitions.

@sutambe
Copy link
Contributor Author

sutambe commented Sep 27, 2016

@jjkoshy I prefer record.send.timeout over send.timeout or callback.timeout because the later can be confused with max.block.ms. I.e., We want to avoid questions like is send timeout measured from beginning or end of producer.send. Secondly, this patch is about when a record times out as opposed to when send times out.

Assuming we'll set a large default value for record.send.timeout, one way to set this value might be Math.max(record.timeout.ms, linger.ms + (retries + 1) * (request.timeout.ms + retry.backoff.ms)). Those who need success/failure notifications "quickly" after send returns, will have to set low values of both record.timeout.ms and retries.

Thoughts?

@becketqin
Copy link
Contributor

@junrao I found it is a little difficult to tune the buffer pool size because it is for all the partitions. For example, in a bootstrap case, we may have topic T with a lot of data to copy and the other topics do not have much to catch up. At a certain point, what could happen is that all the 32 MB of memory is taken by topic T and that would be 2K batches for that topic assuming 16 KB batch size. In this case, the producer may still expire the batches even though the buffer pool is already small.

Another thing is that we actually allocate batches at batch.size granularity, even if the actual size of each batch is 3 KB, with default setting it will still take 16 KB from the buffer pool standpoint. So the default setting will only support 2K batches. It is usually too small for a high volume mirror maker.

So it is kind of hard to tune the buffer pool size because on one hand 32 MB may still not be small enough, but on the other hand the 32 MB is too small.

@MayureshGharat
Copy link
Contributor

@jjkoshy adding the callback timeout seems nice and makes it more clear. On the other hand can we have a single timeout which kafka internally splits up in to these 3 and we can also factor in retires? In that way user has to provide a single timeout and the request can block during the initial phase or get back the results with in that time period. I understand having a single timeout might not be possible, but IMO it would be good to give it a thought since we are planning to add one more timeout config.

@jjkoshy
Copy link
Contributor

jjkoshy commented Sep 30, 2016

@MayureshGharat I may have misunderstood, but exposing a single timeout and splitting internally sounds like it would make matters much worse. In fact the suggestion above was to add an additional timeout to fix the issue that we are overloading one timeout for both the request timeout and the time spent in the accumulator.

@ijuma
Copy link
Contributor

ijuma commented Nov 15, 2016

Is it worth closing this PR until we have the KIP?

@sutambe
Copy link
Contributor Author

sutambe commented Nov 16, 2016

Closing the PR in anticipation of KIP-91.

@sutambe sutambe closed this Nov 16, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants