Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer facing higher latency after a preferred leader election #1927

Closed
ppatierno opened this issue Apr 28, 2021 · 17 comments
Closed

Consumer facing higher latency after a preferred leader election #1927

ppatierno opened this issue Apr 28, 2021 · 17 comments

Comments

@ppatierno
Copy link

Hi,
as part of the Strimzi project, we have a canary application sending and receiving messages to test that the Kafka cluster is working fine. This canary tool is using Sarama and we are facing the following "problem".
Imagine a cluster with 3 brokers, and the canary application creates a topic with 3 partition, one on each broker.
A normal canary flow shows the following recurring log:

Metadata for __strimzi_canary topic
	{ID:0 Leader:0 Replicas:[0 1 2] Isr:[0 1 2] OfflineReplicas:[]}
	{ID:2 Leader:2 Replicas:[2 0 1] Isr:[2 0 1] OfflineReplicas:[]}
	{ID:1 Leader:1 Replicas:[1 2 0] Isr:[1 2 0] OfflineReplicas:[]}
Sending message: value={""producerId"":""strimzi-canary-client"",""messageId"":6421,""timestamp"":1619525979038} on partition=0"
Message sent: partition=0, offset=2140, duration=157 ms"
Sending message: value={""producerId"":""strimzi-canary-client"",""messageId"":6422,""timestamp"":1619525979195} on partition=1"
Message received: value={ProducerID:strimzi-canary-client, MessageID:6421, Timestamp:1619525979038}, partition=0, offset=2140, duration=157 ms"
Message sent: partition=1, offset=2140, duration=220 ms"
Sending message: value={""producerId"":""strimzi-canary-client"",""messageId"":6423,""timestamp"":1619525979415} on partition=2"
Message received: value={ProducerID:strimzi-canary-client, MessageID:6422, Timestamp:1619525979195}, partition=1, offset=2140, duration=220 ms"
Message sent: partition=2, offset=2140, duration=126 ms"
... reconcile done
Message received: value={ProducerID:strimzi-canary-client, MessageID:6423, Timestamp:1619525979415}, partition=2, offset=2140, duration=127 ms"

It shows a latency that quite often is around 200 ms.
Restarting the broker 0 produces the following effect:

Metadata for __strimzi_canary topic
 	{ID:0 Leader:1 Replicas:[0 1 2] Isr:[1 2] OfflineReplicas:[0]}
 	{ID:2 Leader:2 Replicas:[2 0 1] Isr:[2 1] OfflineReplicas:[0]}
	{ID:1 Leader:1 Replicas:[1 2 0] Isr:[1 2] OfflineReplicas:[0]}
Sending message: value={""producerId"":""strimzi-canary-client"",""messageId"":6448,""timestamp"":1619526024136} on partition=0"
Message sent: partition=0, offset=2149, duration=88 ms"
Sending message: value={""producerId"":""strimzi-canary-client"",""messageId"":6449,""timestamp"":1619526024224} on partition=1"
Message received: value={ProducerID:strimzi-canary-client, MessageID:6448, Timestamp:1619526024136}, partition=0, offset=2149, duration=89 ms"
Message sent: partition=1, offset=2149, duration=3 ms"
Sending message: value={""producerId"":""strimzi-canary-client"",""messageId"":6450,""timestamp"":1619526024227} on partition=2"
Message received: value={ProducerID:strimzi-canary-client, MessageID:6449, Timestamp:1619526024224}, partition=1, offset=2149, duration=4 ms"
Message sent: partition=2, offset=2149, duration=162 ms"
... reconcile done
Message received: value={ProducerID:strimzi-canary-client, MessageID:6450, Timestamp:1619526024227}, partition=2, offset=2149, duration=163 ms"

The replica 0 is offline (because broker 0 is down) but canary keeps going to send messages to partition 0 that now has the leader on broker 1. The latency is still ok.

When broker 0 restarts, it doesn’t become the leader for partition 0 immediately and meanwhile the canary still sends to partition 0 (leader on broker 1) and latency is still ok.

Metadata for __strimzi_canary topic
 	{ID:0 Leader:1 Replicas:[0 1 2] Isr:[1 2 0] OfflineReplicas:[]}
 	{ID:2 Leader:2 Replicas:[2 0 1] Isr:[2 1 0] OfflineReplicas:[]}
 	{ID:1 Leader:1 Replicas:[1 2 0] Isr:[1 2 0] OfflineReplicas:[]}
Sending message: value={""producerId"":""strimzi-canary-client"",""messageId"":6463,""timestamp"":1619526049199} on partition=0"
Message sent: partition=0, offset=2154, duration=53 ms"
Sending message: value={""producerId"":""strimzi-canary-client"",""messageId"":6464,""timestamp"":1619526049252} on partition=1"
Message sent: partition=1, offset=2154, duration=47 ms"
Sending message: value={""producerId"":""strimzi-canary-client"",""messageId"":6465,""timestamp"":1619526049299} on partition=2"
Message received: value={ProducerID:strimzi-canary-client, MessageID:6463, Timestamp:1619526049199}, partition=0, offset=2154, duration=100 ms"
Message received: value={ProducerID:strimzi-canary-client, MessageID:6464, Timestamp:1619526049252}, partition=1, offset=2154, duration=48 ms"
Message sent: partition=2, offset=2154, duration=154 ms"
... reconcile done
Message received: value={ProducerID:strimzi-canary-client, MessageID:6465, Timestamp:1619526049299}, partition=2, offset=2154, duration=154 ms"

When finally the preferred leader election happens and broker 0 is again the leader for partition 0, the canary is affected by higher latency on partition 0. You can see from around 200 ms to 940 ms and this latency has an average of 700/800 ms with canary keep going to send/receive messages.

Metadata for __strimzi_canary topic
 	{ID:0 Leader:0 Replicas:[0 1 2] Isr:[1 2 0] OfflineReplicas:[]}
 	{ID:2 Leader:2 Replicas:[2 0 1] Isr:[2 1 0] OfflineReplicas:[]}
 	{ID:1 Leader:1 Replicas:[1 2 0] Isr:[1 2 0] OfflineReplicas:[]}
Sending message: value={""producerId"":""strimzi-canary-client"",""messageId"":6469,""timestamp"":1619526059062} on partition=0"
Message sent: partition=0, offset=2156, duration=352 ms"
Sending message: value={""producerId"":""strimzi-canary-client"",""messageId"":6470,""timestamp"":1619526059414} on partition=1"
Message sent: partition=1, offset=2156, duration=84 ms"
Sending message: value={""producerId"":""strimzi-canary-client"",""messageId"":6471,""timestamp"":1619526059498} on partition=2"
Message received: value={ProducerID:strimzi-canary-client, MessageID:6470, Timestamp:1619526059414}, partition=1, offset=2156, duration=85 ms"
Message sent: partition=2, offset=2156, duration=69 ms"
... reconcile done
Message received: value={ProducerID:strimzi-canary-client, MessageID:6471, Timestamp:1619526059498}, partition=2, offset=2156, duration=79 ms"
Message received: value={ProducerID:strimzi-canary-client, MessageID:6469, Timestamp:1619526059062}, partition=0, offset=2156, duration=940 ms"

Restarting the canary helps.

Tracing down some Sarama stuff, we noticed that when broker 0 goes off, something like this happens:

[Sarama] 2021/04/28 11:48:55 consumer/broker/0 disconnecting due to error processing FetchRequest: EOF
[Sarama] 2021/04/28 11:48:55 Closed connection to broker my-cluster-kafka-brokers.default.svc:9092
[Sarama] 2021/04/28 11:48:55 kafka: error while consuming __strimzi_canary/0: EOF
[Sarama] 2021/04/28 11:48:57 consumer/__strimzi_canary/0 finding new broker
[Sarama] 2021/04/28 11:48:57 client/metadata fetching metadata for [__strimzi_canary] from broker localhost:9092
[Sarama] 2021/04/28 11:48:57 client/metadata got error from broker -1 while fetching metadata: EOF
[Sarama] 2021/04/28 11:48:57 Closed connection to broker localhost:9092
[Sarama] 2021/04/28 11:48:57 client/metadata fetching metadata for [__strimzi_canary] from broker my-cluster-kafka-brokers.default.svc:9092
[Sarama] 2021/04/28 11:48:57 Failed to connect to broker my-cluster-kafka-brokers.default.svc:9092: dial tcp 127.0.0.1:9092: connect: connection refused
[Sarama] 2021/04/28 11:48:57 client/metadata got error from broker 0 while fetching metadata: dial tcp 127.0.0.1:9092: connect: connection refused
[Sarama] 2021/04/28 11:48:57 client/brokers deregistered broker #0 at my-cluster-kafka-brokers.default.svc:9092
[Sarama] 2021/04/28 11:48:57 client/metadata fetching metadata for [__strimzi_canary] from broker my-cluster-kafka-brokers.default.svc:9094
[Sarama] 2021/04/28 11:48:57 consumer/broker/1 added subscription to __strimzi_canary/0

so it's clear to me that now the consumer is consuming partition 0 from broker 1 (that is actually the leader).
But when the broker 0 comes back again and the new leader is elected, we see:

[Sarama] 2021/04/28 11:48:58 producer/broker/0 state change to [closing] because dial tcp 127.0.0.1:9092: connect: connection refused
[Sarama] 2021/04/28 11:48:58 producer/leader/__strimzi_canary/0 state change to [retrying-1]
[Sarama] 2021/04/28 11:48:58 producer/leader/__strimzi_canary/0 abandoning broker 0
[Sarama] 2021/04/28 11:48:58 producer/broker/0 input chan closed
[Sarama] 2021/04/28 11:48:58 producer/broker/0 shut down
[Sarama] 2021/04/28 11:48:58 client/metadata fetching metadata for [__strimzi_canary] from broker my-cluster-kafka-brokers.default.svc:9094
[Sarama] 2021/04/28 11:48:59 producer/leader/__strimzi_canary/0 selected broker 1
[Sarama] 2021/04/28 11:48:59 producer/broker/1 state change to [open] on __strimzi_canary/0
[Sarama] 2021/04/28 11:48:59 producer/leader/__strimzi_canary/0 state change to [flushing-1]
[Sarama] 2021/04/28 11:48:59 producer/leader/__strimzi_canary/0 state change to [normal]

So only the producer got that leader is changed and moves to send to broker 0 but there is no log from the consumer perspective to get that now the leader is 0 and should read from there.
Other than missing this log, I don't understand how the consumer is then consuming and why the higher latency from now on.

@dnwe
Copy link
Collaborator

dnwe commented Apr 28, 2021

@ppatierno a few questions:

  1. what version of Kafka are you running in your clusters, 2.6.x?
  2. what sarama.Config{Version: ...} have you opted-in to for the protocol?

I wonder if since KIP-392 and the ability of fetching from follower, perhaps we might have become sticky on the replica we're consuming on and perhaps we are not correctly handling the (default) LeaderSelector responses to change which broker we're consuming from

@ppatierno
Copy link
Author

@dnwe thanks for jumping into it so quickly!
So I have the same problem with Kafka 2.6.0 and 2.7.0 and configuring the Sarama library with both as well (I also tried Kafka.2.7.0 but Sarama 2.6.0).
I wasn't aware that Sarama was already supporting KIP-392 but at this point, I am suspicious it could be the problem then.

@ppatierno
Copy link
Author

@dnwe FYI I was still using Sarama 1.27.2 before but even switching to the new 1.28.0 (that implements KIP-392: Allow consumers to fetch from closest replica) I see the same problem.

@ppatierno
Copy link
Author

ppatierno commented Apr 28, 2021

@dnwe I dig into it more and it's now clear that the Sarama client is not connecting to broker 0 anymore even when it comes again and is the leader for partition 0.
I added a little bit of log in the broker.go and fetch_request.go to check each broker connection which partitions is asking for.

Initially everything is fine ...

2021/04/28 17:18:17 The canary topic __strimzi_canary already exists
2021/04/28 17:18:17 Metadata for __strimzi_canary topic
2021/04/28 17:18:17 	{ID:0 Leader:0 Replicas:[0 1 2] Isr:[0 1 2] OfflineReplicas:[]}
2021/04/28 17:18:17 	{ID:2 Leader:2 Replicas:[2 0 1] Isr:[2 0 1] OfflineReplicas:[]}
2021/04/28 17:18:17 	{ID:1 Leader:1 Replicas:[1 2 0] Isr:[1 2 0] OfflineReplicas:[]}
2021/04/28 17:18:17 Sending message: value={"producerId":"strimzi-canary-client","messageId":4,"timestamp":1619623097756} on partition=0
[Sarama] 2021/04/28 17:18:17 ***** FetchRequest for broker 2
[Sarama] 2021/04/28 17:18:17 ***** 	partition 2
[Sarama] 2021/04/28 17:18:17 ***** 	partition 2
[Sarama] 2021/04/28 17:18:18 ***** FetchRequest for broker 1
[Sarama] 2021/04/28 17:18:18 ***** 	partition 1
[Sarama] 2021/04/28 17:18:18 ***** 	partition 1
[Sarama] 2021/04/28 17:18:18 ***** FetchRequest for broker 0
[Sarama] 2021/04/28 17:18:18 ***** 	partition 0
[Sarama] 2021/04/28 17:18:18 ***** 	partition 0

then when broker 0 goes down ... broker handler 1 is fetching partition 0 and 1 (broker 1 is leader for both now)

2021/04/28 17:19:27 	{ID:0 Leader:1 Replicas:[0 1 2] Isr:[1 2 0] OfflineReplicas:[]}
2021/04/28 17:19:27 	{ID:2 Leader:2 Replicas:[2 0 1] Isr:[2 1 0] OfflineReplicas:[]}
2021/04/28 17:19:27 	{ID:1 Leader:1 Replicas:[1 2 0] Isr:[1 2 0] OfflineReplicas:[]}
2021/04/28 17:19:27 Sending message: value={"producerId":"strimzi-canary-client","messageId":43,"timestamp":1619623167869} on partition=0
[Sarama] 2021/04/28 17:19:27 ***** FetchRequest for broker 2
[Sarama] 2021/04/28 17:19:27 ***** 	partition 2
[Sarama] 2021/04/28 17:19:27 ***** 	partition 2
[Sarama] 2021/04/28 17:19:28 ***** FetchRequest for broker 1
[Sarama] 2021/04/28 17:19:28 ***** 	partition 1
[Sarama] 2021/04/28 17:19:28 ***** 	partition 0
[Sarama] 2021/04/28 17:19:28 ***** 	partition 0
[Sarama] 2021/04/28 17:19:28 ***** 	partition 1

finally broker 0 comes back again and it becomes leader but broker handler 1 is fetching partition 0 and 1 so it's actually getting messages for partition 0 from a follower replica

2021/04/28 17:21:12 The canary topic __strimzi_canary already exists
2021/04/28 17:21:12 Metadata for __strimzi_canary topic
2021/04/28 17:21:12 	{ID:0 Leader:0 Replicas:[0 1 2] Isr:[1 2 0] OfflineReplicas:[]}
2021/04/28 17:21:12 	{ID:2 Leader:2 Replicas:[2 0 1] Isr:[2 1 0] OfflineReplicas:[]}
2021/04/28 17:21:12 	{ID:1 Leader:1 Replicas:[1 2 0] Isr:[1 2 0] OfflineReplicas:[]}
2021/04/28 17:21:12 Sending message: value={"producerId":"strimzi-canary-client","messageId":106,"timestamp":1619623272755} on partition=0
2021/04/28 17:21:12 Message sent: partition=0, offset=35, duration=9 ms
2021/04/28 17:21:12 Sending message: value={"producerId":"strimzi-canary-client","messageId":107,"timestamp":1619623272764} on partition=1
[Sarama] 2021/04/28 17:21:12 ***** FetchRequest for broker 2
[Sarama] 2021/04/28 17:21:12 ***** 	partition 2
[Sarama] 2021/04/28 17:21:12 ***** 	partition 2
[Sarama] 2021/04/28 17:21:13 ***** FetchRequest for broker 1
[Sarama] 2021/04/28 17:21:13 ***** 	partition 1
[Sarama] 2021/04/28 17:21:13 ***** 	partition 0
[Sarama] 2021/04/28 17:21:13 ***** 	partition 1
[Sarama] 2021/04/28 17:21:13 ***** 	partition 0

Referring KIP-392 and checked that:

  • the RackID in Sarama is empty
  • the ReplicaID in the fetch request is -1 (it's actually hardcoded as it should be always -1 for clients)

Do we should expect the broker returning an error when the client tries to fetch from broker 1 that is not leader for partition 0 anymore?

@ppatierno
Copy link
Author

Going further we saw that on the broker side there is this condition:

val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty)

https://github.com/apache/kafka/blob/40f001cc537d6ff2efa71e609c2f84c6b934994d/core/src/main/scala/kafka/server/ReplicaManager.scala#L1037

So the logic is about fetching from the leader if it’s a follower (so another broker) or a consumer but that clientMetadata is Empty where it’s defined here:

https://github.com/apache/kafka/blob/5964401bf9aab611bd4a072941bd1c927e044258/core/src/main/scala/kafka/server/KafkaApis.scala#L671

And as you can see for version >= 11 so Kafka 2.6 and above, it’s never empty but returned Some.

The broker allows continuing to fetch from a follower even if the leader came back again.
The only way to switch again to leader is for the client refreshing metadata and check that now the leader is changed.
Maybe Sarama doesn't do a periodic metadata refresh as it happens with the Java client or I have misconfigured it.

@dnwe
Copy link
Collaborator

dnwe commented Apr 28, 2021

We definitely do periodically refresh metadata, but I imagine we don't stop and change a partition consumer if it's working. Thanks for all the good debug!

@ppatierno
Copy link
Author

@dnwe yes you are right (of course! ;-)) even after a metadata refresh nothing is changed.
In our canary, we already have a logic in a specific situation that when new metadata are important to us, we cancel the context of the Consume on consumer group and call it again, in order to rejoin the group and using the new metadata.
Of course, it would be great having this logic into Sarama itself in case of a preferred leader is now back again.

@ppatierno
Copy link
Author

@dnwe as follow up to this, taking a look at the Sarma source code, I see there is a kind of mechanism to exit the session when the number of topic partitions change on metedata refresh (see loopCheckPartitionNumbers function).
Do you think that it could be a good place to set there a logic for checking that leaders changed as well? I could propose a change via a PR.

@dnwe
Copy link
Collaborator

dnwe commented May 5, 2021

Yeah...so that was added under #1525 and then corrected under #1578 though it probably still needs revisiting as the "pause" ticker matching the refresh frequency isn't really ideal, this would ideally be event-driven from MetadataResponses arriving.

So yes, you could add some code there to look at the client.Leader(...) for each topicpartition and take action if that has changed between ticks, as you mention that would match what we're currently doing for changes in topicpartitions numbers.

However, I wonder if a simpler solution would be to modify the Sarama KIP-392 code slightly to handle this. As we already have a preferredBroker func that will fallback to the (cached) partition leader if the preferredReadReplica is not set, then we could use the return value of that (rather than only an explict preferredReadReplica in the FetchResponse) to compare against the 'current' broker being used to consume the partition from and electing to move to consuming from that

i.e.,

diff --git a/consumer.go b/consumer.go
index 8681f35..46c2613 100644
--- a/consumer.go
+++ b/consumer.go
@@ -838,10 +838,12 @@ func (bc *brokerConsumer) handleResponses() {
 		child.responseResult = nil
 
 		if result == nil {
-			if child.preferredReadReplica >= 0 && bc.broker.ID() != child.preferredReadReplica {
-				// not an error but needs redispatching to consume from prefered replica
-				child.trigger <- none{}
-				delete(bc.subscriptions, child)
+			if preferredBroker, err := child.preferredBroker(); err == nil {
+				if bc.broker.ID() != preferredBroker.ID() {
+					// not an error but needs redispatching to consume from prefered replica
+					child.trigger <- none{}
+					delete(bc.subscriptions, child)
+				}
 			}
 			continue
 		}

I'll push this up to a branch and perhaps you could give it a spin?

dnwe added a commit that referenced this issue May 5, 2021
Historically (before protocol version 11) if we attempted to consume
from a follower, we would get a NotLeaderForPartition response and move
our consumer to the new leader. However, since v11 the Kafka broker
treats us just like any other follower and permits us to consume from
any replica and it is up to us to monitor metadata to determine when the
leadership has changed.

Modifying the handleResponse func to check the topic partition
leadership against the current broker (in the absence of a
preferredReadReplica) and trigger a re-create of the consumer for that
partition

Contributes-to: #1927
@dnwe
Copy link
Collaborator

dnwe commented May 5, 2021

@ppatierno please can you let me know if go get github.com/Shopify/sarama@1a3f5b3951fd5d94aa9b8706d1e14c9480fd0483 improves the situation for you?

@ppatierno
Copy link
Author

@dnwe that was a simpler solution, even because the -1 returned in the preferred_read_replica should mean "use the leader" which is exactly what the preferredBroker function does.
I tried and it worked as expected. Is there an ETA for a new release with this fix after got it merged? :-)

@dnwe
Copy link
Collaborator

dnwe commented May 6, 2021

Excellent, thanks for confirming. I think we're due a new release imminently due to some other important bugfixes and new KIPs. I'd like to add a unittest to cover this usecase too, but I imagine we can tag a new release within the next few days (cc @bai)

dnwe added a commit that referenced this issue May 7, 2021
Historically (before protocol version 11) if we attempted to consume
from a follower, we would get a NotLeaderForPartition response and move
our consumer to the new leader. However, since v11 the Kafka broker
treats us just like any other follower and permits us to consume from
any replica and it is up to us to monitor metadata to determine when the
leadership has changed.

Modifying the handleResponse func to check the topic partition
leadership against the current broker (in the absence of a
preferredReadReplica) and trigger a re-create of the consumer for that
partition

Contributes-to: #1927
dnwe added a commit that referenced this issue May 7, 2021
TestConsumeMessagesTrackLeader ensures that in the event that leadership
of a topicPartition changes and no preferredReadReplica is specified,
the consumer connects back to the new leader to resume consumption and
doesn't continue consuming from the follower.

See #1927
dnwe added a commit that referenced this issue May 7, 2021
TestConsumeMessagesTrackLeader ensures that in the event that leadership
of a topicPartition changes and no preferredReadReplica is specified,
the consumer connects back to the new leader to resume consumption and
doesn't continue consuming from the follower.

See #1927
@dnwe
Copy link
Collaborator

dnwe commented May 7, 2021

@ppatierno thanks for the bug report — should be fixed in v1.29.0

@ssorren
Copy link

ssorren commented Sep 15, 2021

@dnwe
I believe this fix may have broke KIP-392 support, (fetching from closest replica). I observed on a consumer group, utilizing rack awareness, that the consume latency increased to about 4.5 seconds and logs were flooded with meta data requests. I'm presuming this switch to leader logic is racing with consume from replica logic.

In my case, I had rack awareness on for testing purposes only. When I disabled this in the consumer group, the issue went away.

@lizthegrey
Copy link
Contributor

also seeing issues after bisecting to 1aac8e5 -- throughput drops to approximately 0 on consumers who are rack-aware, but not on consumers with rack awareness disabled.

@dnwe
Copy link
Collaborator

dnwe commented Nov 26, 2021

😱 please can you raise an issue, we should fix asap

@lizthegrey
Copy link
Contributor

Opening issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants