Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15019: Improve handling of broker heartbeat timeouts #13759

Merged
merged 4 commits into from May 31, 2023

Conversation

cmccabe
Copy link
Contributor

@cmccabe cmccabe commented May 24, 2023

When the active KRaft controller is overloaded, it will not be able to process broker heartbeat
requests. Instead, they will be timed out. When using the default configuration, this will happen
if the time needed to process a broker heartbeat climbs above a second for a sustained period.
This, in turn, could lead to brokers being improperly fenced when they are still alive.

With this PR, timed out heartbeats will still update the lastContactNs and metadataOffset of the
broker in the BrokerHeartbeatManager. While we don't generate any records, this should still be
adequate to prevent spurious fencing. We also log a message at ERROR level so that this condition
will be more obvious.

Other small changes in this PR: fix grammar issue in log4j of BrokerHeartbeatManager. Add JavaDoc
for ClusterControlManager#zkMigrationEnabled field. Add builder for ReplicationControlTestContext
to avoid having tons of constructors. Update ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS to
match the default in KafkaConfig.

Copy link
Contributor

@rondagostino rondagostino left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Colin. I left some code comments, but the biggest thing is the overall is a concern about the fixed window-based approach. Take a look at my comment and let me know what you think.

Comment on lines 286 to 300
/**
* The maximum number of timed out heartbeats to count.
*/
static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000;

/**
* The time period over which to track timed out heartbeats.
*/
static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = TimeUnit.MINUTES.toNanos(5);

/**
* The number of heartbeats to notice missing before we go into overload.
*/
static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whatever we set this to (and currently we always take this default value) will determine how long it would take for a controller to leave the overloaded state, right? So if we see a series of missed heartbeats that puts us into the overloaded state, and then we don't see any more missed heartbeats, it would take this amount of time to leave that overloaded state?

Consider the following case. We are overloaded. And then some broker crashes. How long would we want it to take before we move leadership away for any partitions led by that suddenly-dead broker? Ideally the session timeout, of course, but that won't happen if we are overloaded -- will it take 5 minutes as currently coded? That seems like a long time.

I think the only thing we can say is that we should move leadership away as soon as we have a correct view of the cluster. The question then becomes: how do we know when we have good information?

A fixed amount of time like this with no missed heartbeats -- that seems too simple. We likely have a reasonable view of the cluster if we see enough heartbeats without missing any such that it is likely that all brokers have had a chance to communicate with us. So it seems like maybe the broker heartbeat interval should factor into it somehow?

Let's say the broker heartbeat interval is 2 seconds. Right now we consider a heartbeat missed if it is more than half the heartbeat interval (i.e. 1 second) old. Let's say there are 60 brokers in the cluster. If we see something on the order of 60 heartbeats in row without having missing any, then we probably have a reasonable view.

So maybe it isn't having missed no more than N heartbeats in some fixed time window. Maybe we define "not overloaded" as having seen some string of heartbeats uninterrupted without seeing a missed one? So every time we miss a heartbeat we go into the "overloaded" state and we reset a counter of contiguous successfully processed heartbeats to 0. Whenever we see a heartbeat and process it in time we increase that counter. We are no longer overloaded when the contiguousHeartbeatsProcessedInTime counter is equal to or exceeds the broker count.

I'm not sure I have it right, but I think this is a discussion worth pursuing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this some more, I believe the key requirement is to accurately understand when we do not have a correct view of the cluster. This allows us to handle 2 important cases: not fencing a broker if its session times out but we think we could have missed enough heartbeats to make the decision to fence the wrong decision; and fencing a broker if its session times out whenever we think we have accurate information. I think I addressed the second part (detect and fence a crashed broker as quick as possible) in the comment above: I believe we have accurate information if we see a contiguous series of N successfully-processed heartbeats with no intervening timed-out heartbeats where N is perhaps the cluster size. For the first part (don't fence if we think the decision to do so could be wrong) assume the broker session is 18 seconds and the heartbeat interval is 2 seconds. That means we would need to miss 9 heartbeats for a broker in order to incorrectly fence it. Maybe we keep track of the number of contiguous successful heartbeats with no intervening misses (which, if we aren't missing any would always be a very high number). But then as soon as we miss one we increment the missed count and reset the contiguous count to 0. When we successfully process a heartbeat we increment the contiguous count and, if it reaches the necessary threshold N (which is on the order of the cluster size) we reset the missed count to 0. We can fence brokers only while the missed count is less than the session/interval ratio (i.e. 18/2 = 9 by default).

We can tweak this to be a bit more conservative. Maybe we need N (the number of contiguous heartbeats seen to assure us we have good visibility) to be 1.5 or 2 times the broker count instead of just the broker count. Maybe the missed count only has to exceed half the session/interval ratio (so only missing 5 heartbeats without seeing N successfully-processed ones in a row instead of 9 by default) to prevent fencing.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you that 5 minutes is way too long to "mute" broker removals. That was a miss in the orignial PR. (As a side note, broker session is not 18 seconds. It is 9 seconds.)

I thought about this more and I think we may not need the overload state I originally wanted to introduce at all. We can simply do some basic processing on the heartbeat when we time it out. Specifically, we can update the "last seen time" of the broker. This will avoid the "congestion meltdown" behavior without introducing a new state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a side note, broker session is not 18 seconds. It is 9 seconds.

I was confused by the existence of this, which sets 18 seconds:

final static long DEFAULT_SESSION_TIMEOUT_NS = NANOSECONDS.convert(18, TimeUnit.SECONDS);

But the above always get overridden by the value of broker.session.timeout.ms in the config, which does default to 9 seconds (

val BrokerSessionTimeoutMs = 9000
).

So yes, it is 9 seconds as you state above. I wonder if the default in ClusterControlManager.java should be changed to match the config's default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update it

@ijuma
Copy link
Contributor

ijuma commented May 28, 2023

Thanks for the PR. Can you explain how this helps in a bit more detail? Let's use a small cluster with 3 or 4 brokers as an example. The 3 missed heartbeats in your example would result in 3 brokers being fenced by the time the overload state is entered? Or did I misunderstand? And what happens next?

Copy link
Member

@soarez soarez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how this makes sense. Degrading functionality opens the door to new failure modes, such the the one Ron described. If the controller is overloaded, shouldn't that be addressed with reconfiguration? Perhaps the description of the issue can be improved?

@rondagostino
Copy link
Contributor

The 3 missed heartbeats in your example would result in 3 brokers being fenced by the time the overload state is entered? Or did I misunderstand? And what happens next?

Using the default values of broker session timeout = 9 seconds and broker heartbeat interval = 2 seconds, we would need to miss at least 4 heartbeats before any single broker could potentially lose its session. And with 3 brokers there would by 3 heartbeats arriving every 2 seconds, so after 8 seconds we would typically receive 3 * 4 = 12 heartbeats. If we just missed 3 of them it is conceivable that they would all have been for the same broker.

But note that Colin states above I think we may not need the overload state I originally wanted to introduce at all. We can simply do some basic processing on the heartbeat when we time it out. Specifically, we can update the "last seen time" of the broker.". I believe what he is saying is that when we see a heartbeat request in the event queue and it is old enough such that we "missed" it, we can still say that the broker that sent it was alive and able to contact us at that time (whenever it was -- it could have been several seconds ago by the time we see it). So while we can't respond to it -- the broker has timed-out the request and is no longer waiting for the response -- we can still note that the broker was alive at that point. At first glance this seems like a lot better approach than introducing this "overload" state -- as the discussion shows, such a state isn't intuitive to reason about. And I see no reason why we can't take the approach of counting a heartbeat that we don't respond to as indicating the broker was alive...

@cmccabe cmccabe force-pushed the KAFKA-15019 branch 2 times, most recently from b9ae04e to 910c401 Compare May 30, 2023 18:09
When the active KRaft controller is overloaded, it will not be able to process broker heartbeat
requests. Instead, they will be timed out. When using the default configuration, this will happen
if the time needed to process a broker heartbeat climbs above a second for a sustained period.
This, in turn, could lead to brokers being improperly fenced when they are still alive.

With this PR, timed out heartbeats will still update the lastContactNs and metadataOffset of the
broker in the BrokerHeartbeatManager. While we don't generate any records, this should still be
adequate to prevent spurious fencing. We also log a message at ERROR level so that this condition
will be more obvious.

Other small changes in this PR: fix grammar issue in log4j of BrokerHeartbeatManager. Add JavaDoc
for ClusterControlManager#zkMigrationEnabled field. Add builder for ReplicationControlTestContext
to avoid having tons of constructors. Update ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS to
match the default in KafkaConfig.
@cmccabe cmccabe changed the title KAFKA-15019: Improve handling of overload situations in the kcontroller KAFKA-15019: Improve handling of broker heartbeat timeouts May 30, 2023
@cmccabe
Copy link
Contributor Author

cmccabe commented May 30, 2023

Hi all,

I've updated the approach in the PR. The new approach simply adds basic heartbeat handling in the timeout case. We can't do everything we normally can here. Most notably, anything involving heartbeats causing a state change like fenced -> unfenced or entering controlled shutdown is off limits. But it's sufficient to prevent the congestion collapse scenario, and it's pretty simple.

I will also create a KIP to add a new metric for timed out broker heartbeats, since I think that's very worthwhile to do. We can do that in a follow-on PR. This is something we'll want to track pretty closely. If heartbeats start timing out that indicates a critical performance problem that needs to be addressed by the operator.

Copy link
Contributor

@rondagostino rondagostino left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yeah, this is a much better approach. Left a couple of questions.

Comment on lines 40 to 44
if (exception.getMessage() != null &&
exception.getMessage().equals("The controller is shutting down.")) {
return false;
}
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems brittle. Is there a way we can identify the specific thing we are interested in?

Copy link
Contributor Author

@cmccabe cmccabe May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the check for the exception text. (It wasn't actually doing anything here in any case!) This is something I'll revisit in a follow-on PR (there are a few corner cases around error handling and queue shutdown to fix up)

val latch = controller.controller.asInstanceOf[QuorumController].pause()
Thread.sleep(1001)
latch.countDown()
controller.controller.asInstanceOf[QuorumController].pause().countDown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this line? Seems it is an unnecessary extra pause/resume, and the test passes without it. Can we remove it?

Copy link
Contributor Author

@cmccabe cmccabe May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

clusterControl.heartbeatManager().touch(brokerId,
clusterControl.brokerRegistrations().get(brokerId).fenced(),
request.currentMetadataOffset());
log.error("processExpiredBrokerHeartbeat: timed out heartbeat from broker {}.", brokerId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the operator meant to do when they see this error message? Perhaps we can explain the implications in the log message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to "controller event queue overloaded. timed out heartbeat from broker {}"

Copy link
Contributor

@rondagostino rondagostino left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the fine-tuning of comments and log messages.

@cmccabe cmccabe merged commit 9b3db6d into apache:trunk May 31, 2023
1 check was pending
@cmccabe cmccabe deleted the KAFKA-15019 branch May 31, 2023 17:49
Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach seems reasonable to me.

* heartbeats by updating the lastSeenNs of the broker, so that the broker won't get fenced
* incorrectly. However, we don't perform any state changes that we normally would, such as
* unfencing a fenced broker, etc.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this comment, it's very helpful.

cmccabe added a commit that referenced this pull request Jun 1, 2023
When the active KRaft controller is overloaded, it will not be able to process broker heartbeat
requests. Instead, they will be timed out. When using the default configuration, this will happen
if the time needed to process a broker heartbeat climbs above a second for a sustained period.
This, in turn, could lead to brokers being improperly fenced when they are still alive.

With this PR, timed out heartbeats will still update the lastContactNs and metadataOffset of the
broker in the BrokerHeartbeatManager. While we don't generate any records, this should still be
adequate to prevent spurious fencing. We also log a message at ERROR level so that this condition
will be more obvious.

Other small changes in this PR: fix grammar issue in log4j of BrokerHeartbeatManager. Add JavaDoc
for ClusterControlManager#zkMigrationEnabled field. Add builder for ReplicationControlTestContext
to avoid having tons of constructors. Update ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS to
match the default in KafkaConfig.

Reviewers: Ismael Juma <ijuma@apache.org>, Ron Dagostino <rdagostino@confluent.io>
emissionnebula added a commit to confluentinc/kafka that referenced this pull request Jun 16, 2023
…tream-3.5

* commit 'c2f6f29ca6e1306ac77ec726bac4cd09bd1aa80b': (76 commits)
  KAFKA-15019: Improve handling of broker heartbeat timeouts (apache#13759)
  KAFKA-15003: Fix ZK sync logic for partition assignments (apache#13735)
  MINOR: Add 3.5 upgrade steps for ZK and KRaft (apache#13792)
  KAFKA-15010 ZK migration failover support (apache#13758)
  KAFKA-15017 Fix snapshot load in dual write mode for ClientQuotas and SCRAM  (apache#13757)
  MINOR: Update LICENSE-binary following snappy upgrade (apache#13791)
  Upgrade to snappy v1.1.10.0 (apache#13786)
  KAFKA-15004: Fix configuration dual-write during migration (apache#13767)
  KAFKA-8713: JsonConverter replace.null.with.default should prevent emitting default for Struct fields (apache#13781)
  KAFKA-14996: Handle overly large user operations on the kcontroller (apache#13742)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants