Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14190: Update Zk TopicId from locally stored cache in controller #13111

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from

Conversation

divijvaidya
Copy link
Contributor

@divijvaidya divijvaidya commented Jan 13, 2023

Change

Controller should update Zk with locally cached TopicId (when available) instead of assigning a new one when Zk doesn't have a TopicId.

Motivation for this change

This problem was highlighted in KAFKA-14190 and since then, multiple users have complained about the problem HERE (mailing list), HERE (mailing list) and HERE (ASF slack channel).

Description of the problem

In certain situations, it is possible that the TopicId stored locally on a broker for a topic differs from the topicId stored for that topic on Zk. Currently, such situation arises when users use a <2.8 client to alterPartitions for a topic on a >=2.8 (including latest 3.4) brokers AND they use --zookeeper flag from the client. Note that --zookeeper has been marked deprecated for a long time and has been replaced by --bootstrap-server which doesn't face this problem.

The result of topic Id discrepancy leads to availability loss for the topic until user performs the mitigation steps listed in KAFKA-14190.

The exact sequence of steps are:

  1. User uses pre 2.8 client to create a new topic in zookeeper directly
  2. No TopicId is generated in Zookeeper
  3. KafkaController listens to the ZNode, and a TopicChange event is created, During handling on this event, controller notices that there is no TopicId, it generated a new one and updates Zk.
  4. At this stage, Zk has a TopicId.
  5. User uses pre 2.8 client to increase the number of partitions for this topic
  6. The client will replace/overwrite the entire existing Znode with new placement information. This will delete the existing TopicId in Zk (that was created by controller in step 3).
  7. Next time KafkaController interacts with this ZNode, it will generate a new TopicId.
  8. Note that we now have two different TopicIds for this topic name.
  9. Broker may have a different topicId (older one) in metadata file and will complain about the mismatch when they encounter a new TopicId.

Testing

  1. I have added a test with this change which asserts that TopicId for a topic is immutable i.e. once assigned, it does not change. This test fails before this change and passes after this change.

  2. All integration tests and unit tests have been successful for me locally.

Side effects of this fix

There are no additional side effects of this change. No additional calls to Zk. We are only updating the TopicId from a locally cached value instead of assigning a new one.

Caveats

This code change does not fix the problem completely. The code change assumes that controller would have the TopicId locally so that it can update Zk but situations such as controller failover, that may not be true. More specifically, we will still end up having two different topic Ids in cases when controller failover takes place between the time when Zk TopicID was overwritten/removed and time when controller could update the TopicId with local value.

However, this code change should fix majority of the scenario that are impacted by this bug and a separate PR would be filed to fix the minority scenarios of controller failover during the exact duration.

Release

Due to the simple nature of the fix and the number of users who are impacted, I would request to consider adding this to 3.4.0 and backporting to as many previous version as we can.

@divijvaidya
Copy link
Contributor Author

@jolshan, since you are our topic Id expert, please take a look at this change when you get a chance. I would request some urgency (if possible) since I would preferably like to have this bug fix added in 3.4 due to greater number of users facing this problem with each passing day.

@ijuma ijuma requested review from jolshan and junrao January 15, 2023 07:00
Copy link
Collaborator

@clolov clolov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This obviously needs to also be reviewed by people with more knowledge of the codebase, but the current changes make sense to me.

@@ -1698,7 +1708,7 @@ class KafkaController(val config: KafkaConfig,
private def processPartitionModifications(topic: String): Unit = {
def restorePartitionReplicaAssignment(
topic: String,
newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
newPartitionReplicaAssignment: Set[(TopicPartition, ReplicaAssignment)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we make this a set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we changed the call to Zk from zkClient.getFullReplicaAssignmentForTopics to zkClient.getReplicaAssignmentAndTopicIdForTopics since the former dud not provide the topic Id. As a result of this change we needed to use flatMap at line 1730 and hence it results in a Set.

We can work on improving the readibility as you suggested once we have a consensus on whether we even want to make this change at all.

}
onNewPartitionCreation(partitionsToBeAdded.keySet)
processTopicIds(partitions)
partitionsToBeAdded.foreach(tuple => controllerContext.updatePartitionFullReplicaAssignment(tuple._1, tuple._2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think switching this to "tuple" makes it harder to read and less clear what the variables are.

}.toSet

val setDataRequests = updatedAssignments.map { case TopicIdReplicaAssignment(topic, topicIdOpt, assignments) =>
SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(topicIdOpt, assignments), ZkVersion.MatchAnyVersion)
}.toSeq

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spacing

// Otherwise, maintain what we have in the topicZNode
val updatedTopicIdAssignments = if (config.usesTopicId) {
val (withTopicIds, withoutTopicIds) = topicIdAssignments.partition(_.topicId.isDefined)
withTopicIds ++ zkClient.setTopicIds(withoutTopicIds, controllerContext.epochZkVersion)
val (withTopicIds, withoutTopicIds, withLocalTopicIds) = topicIdAssignments.foldLeft((Set.empty[TopicIdReplicaAssignment], Set.empty[TopicIdReplicaAssignment], Set.empty[TopicIdReplicaAssignment])) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readability is a bit tricky here. I'm also wondering if there was an operation you could have done besides making singleton sets and unioning them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we go through all the partitions again to check the topic ID and add to the replica assignment I wonder if we could have kept the partition and then a separate part for the ones with local IDs. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also wondering if there was an operation you could have done besides making singleton sets and unioning them.

I chose this approach because this was the only one where we are able to 3-way partition the topicIdAssignments in a single iterations. All other approaches require multiple iterations.

We can work on improving this and favouring readibility once we have a consensus on whether we will proceed with this change or not.

@jolshan
Copy link
Contributor

jolshan commented Jan 16, 2023

More specifically, we will still end up having two different topic Ids in cases when controller failover takes place between the time when Zk TopicID was overwritten/removed and time when controller could update the TopicId with local value.

I'm a bit concerned about cases like this. I'm also wondering if there are any cases where we actually assign the wrong ID to a topic. I think as it stands now, we isolated the issue to older clients, but once we start making kafka changes (and releasing them into new releases) we may open up the problem further. I'm also not sure if it's worth adding this complexity to the code path to handle this edge case.

Finally I'm wondering the implications for upgrades and downgrades.

@ijuma
Copy link
Contributor

ijuma commented Jan 16, 2023

It's too late for 3.4 unless it's a regression or a low risk critical fix to something new in 3.4.

Have we tried encouraging users to move away from the command line switches deprecated a long time ago?

@cmccabe
Copy link
Contributor

cmccabe commented Jan 17, 2023

I hate to say this, but I wonder if this should be WONTFIX. The rule for admin tools that used the --zookeeper flag was always that you needed to match the admin tool version and the software version. Even using the admin tool for version 2.8 on version 2.7 would have been considered a major operator error. The scenario here we're discussing is using the 2.8 tool on a 3.x version. So a different major version (not even minor), and version that was released several years later. That's way outside of what we ever supported with the --zookeeper flag.

Let me give an example of the general kind of problem you could have here. If you created a prefix ACL using an admin tool that had the --zookeeper flag, but your kafka version didn't support prefix ACLs, what would happen? I'm not completely sure, but nothing good.

In this specific case you have old software overwriting whatever the new software has put in the znode. So it's not just topic IDs, but in general anything new that we add, that will get overwritten. We can't really support this -- we never promised to keep that znode the same forever.

Stuff like this is why the --zookeeper flags were removed. We do support cross-version compatibility when using the --bootstrap-server option.

I wonder if we could somehow move the ZK paths so that the old tools would fail rather than doing the wrong thing. Another option is to set ZK-level ACLs to prevent these tools from going behind Kafka's back. We did talk about this but due to the accelerated timeline for 3.4 we never implemented it. Obviously in a case where we're upgrading from ZK to KRaft we would want to avoid users doing this.

@jolshan
Copy link
Contributor

jolshan commented Jan 17, 2023

What you say makes sense Colin. I do think its a bit tricky to make such a big code change to support folks using older and deprecated tools.

I also understand the point of view of the pain this causes though. (It's caused me quite a bit of pain!) I am interested to see if there are any other options here.

@divijvaidya
Copy link
Contributor Author

divijvaidya commented Jan 18, 2023

Thanks for the comments folks. I would like to break the conversation as multiple FAQs and hopefully that would address the questions and points that you have raised above.

What is the motivation? Is it to make the latest versions compatible with pre 2.8 clients (for the scope of this bug) OR is it to protect the server when older clients are used?

It’s the latter. Currently, the bug manifests in availability loss for the impacted topic since the topic stops replication. This is recoverable by deleting the metadata file and broker will recreate it from Zk.

However, when KIP405(Tiered Storage) is merged in, it will begin to impact data integrity. This is because the metadata for a segment uses topicId as a key. When same segment for same topic is uploaded with different topic Ids, it leads to an unrecoverable situation.


I would be happy to discuss a different solution than what has been proposed in the PR which can protect the server against the above two cases.



Does this bug impact client/server in the same major version as well?

Yes. <2.8 client with >=2.8 server will have this bug if the <2.8 client uses --zookeeper command line to alter the partitions for a topic. Note that usage of --zookeeper has been deprecated since v2.4 and ideally customers should not be using it.


Can the users migrate to the newer versions?

That would be ideal. But practically, there are many cases where the users rely on 3P libraries which haven’t updated their client version. We have been observing multiple cases where customers are facing this bug.
 As a community, we can push the problem back to the users and request them to upgrade their software, OR we can empathise with their situation and try to find a path forward which doesn’t have side effects and doesn’t burden the newer clients/servers.
In some cases, former is the right thing to do but I would argue that in this particular case, we have a simple and safe fix to prevent majority of the cases. Hence, in this particular case, we can strive to improve the experience of the users and go with the latter option.



Why is this PR safe to merge?
Change in this PR breaks the premise that Zk is the source of truth since it updates Zk with a value that is stored locally in the controller. This is not ideal. But it is a safe change to make. This is primarily because topic IDs are immutable and controller context is either empty or consistent with the latest state of the system. More specifically:

  1. we update Zk only when it doesn’t have a topic Id during alter partition which is not possible (since create topic would have allocated a topic Id) unless it hits this bug. 

Hence, we won't encounter a scenario where we "overwrite" an existing topic Id.

  2. Topic IDs are immutable. They only change for a topic, when it is deleted and re-created. In cases, where topic is deleted and re-created, controller context removes the topic Id from local cache on deletion. Hence, the topic Id in the local cache of a controller is always the one which should correctly be associated with a particular topic.

  3. The zkClient.setTopicIds() ensures that Zk is only updated from the latest controller (by verifying the controller epoch), hence, eliminating the possibility of a stale controller updating the Zk with stale topic Id.

What are the alternative ways to protect the state of the server against thus bug?

  1. As Colin, suggested, we could potentially start storing topic Ids in a different place in Zk so that they don't get overwritten by older clients. I believe that it is a more intrusive change (and much holistic covering 100% of bug scenarios) than what I suggested above.
  2. If a topic Id mismatch is detected, consider the partition as a "bad partition" and perform the recovery steps listed https://issues.apache.org/jira/browse/KAFKA-14190 manually. Stop archival to remote storage as soon as a topic Id mismatch is detected. We should probably make this change in addition to the change in this PR.

Any other suggestions?

@ijuma
Copy link
Contributor

ijuma commented Jan 18, 2023

Yes. <2.8 client with >=2.8 server will have this bug.

This is not quite right. You need multiple things:

  1. Tools older than 3.0
  2. Tools need to use --zookeeper instead of --bootstrap-server (--zookeeper has been deprecated for several years, many releases before 2.8)
  3. Direct access to zk is available to these tools (we have recommended not providing direct access to zk for a while)
  4. Brokers newer than 2.8

@divijvaidya
Copy link
Contributor Author

Yes. <2.8 client with >=2.8 server will have this bug.

This is not quite right. You need multiple things:

  1. Tools older than 3.0
  2. Tools need to use --zookeeper instead of --bootstrap-server
  3. Direct access to zk is available to these tools (we have recommended not providing direct access to zk for a while)
  4. Brokers newer than 2.8

Yes, you are right. I already mentioned it in the description of this PR. I will update my above comment to be more specific.

@divijvaidya
Copy link
Contributor Author

I am not super sure (still checking) but I believe https://github.com/yahoo/CMAK is a popular 3P tool that directly accesses zookeeper with older clients.

@ijuma
Copy link
Contributor

ijuma commented Jan 18, 2023

I guess these are the two relevant issues:

  1. Update CMAK to use Kafka 2.8.0+ libs due to critical bug discovered in Kafka yahoo/CMAK#900
  2. Kafka 3.3.1 with KRaft Support yahoo/CMAK#898

Not sure how easy it would be, but contributing a fix to the project above would be really helpful.

@jolshan
Copy link
Contributor

jolshan commented Jan 18, 2023

@divijvaidya My other concern here is that even though this fixes the issue in the case where the controller stays the same, it doesn't cover controller re-election. This means we would still have to share and support the recovery methods.

If this is a big issue for tiered storage, then we could still be in trouble.

@dajac
Copy link
Contributor

dajac commented Jan 18, 2023

If this is a big issue for tiered storage, then we could still be in trouble.

Will this code still be around by the time tiered storage is completed?

@jolshan
Copy link
Contributor

jolshan commented Jan 18, 2023

When same segment for same topic is uploaded with different topic Ids, it leads to an unrecoverable situation.

Also curious if we can upload a segment with the wrong ID if the leader and ISR request is blocked (and thus can't become a leader or follower)

@divijvaidya
Copy link
Contributor Author

divijvaidya commented Jan 24, 2023

@dajac

Will this code still be around by the time tiered storage is completed?

I don't know but my point is that this code change is simple and safe enough to add it to the current code as of today. It will prevent the non-TS topic mismatch bugs and when TS comes to upstream, the impact on it will be mitigated.

@jolshan

My other concern here is that even though this fixes the issue in the case where the controller stays the same, it doesn't cover controller re-election. This means we would still have to share and support the recovery methods. If this is a big issue for tiered storage, then we could still be in trouble.

To be very precise here, this fix won't work, if the controller context does not have the old topic Id. It will only happen when controller failover took place exactly between the duration when admin overwrote Zk and controller. Note that controller failover during all other time will work fine (since controller will recreate controller context from Zk which would have been updated with oldTopicId earlier).

And yes, I agree this is not a 100% fix but it's a start. Since, it's a safe fix and doesn't have side effects, we should push it out.

Also curious if we can upload a segment with the wrong ID if the leader and ISR request is blocked (and thus can't become a leader or follower)

Great question! The topic Id mismatch check during handling of LISR request is based on matching the local topic Id in the broker with the one that is sent with LISR. However, it's very much possible to not have any topicId locally. As an example, let's say the partition reassignment leads to partition placement on a broker where log hasn't been created so far. In such cases, LISR won't throw a topic mismatch error and it won't be blocked. Instead it will start operating with new topic Id. Now, we will have some followers working with old topic Id (where LISR was blocked) and some with new topic Id. If a failover happens to the one with new topic Id, it will start uploading segments to tiered storage with new topic Id and thus, for the same topic partition, we will have segments with old topic Id as well as new topic Id.

@github-actions
Copy link

This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch)

If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed.

@github-actions github-actions bot added the stale Stale PRs label Jun 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale Stale PRs
Projects
None yet
6 participants