Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16525; Dynamic KRaft network manager and channel #15986

Merged
merged 21 commits into from
Jun 3, 2024

Conversation

jsancio
Copy link
Member

@jsancio jsancio commented May 17, 2024

Allow KRaft replicas to send requests to any node (Node) not just the nodes configured in the controller.quorum.voters property. This flexibility is needed so KRaft can implement the controller.quorum.voters configuration, send request to the dynamically changing set of voters and send request to the leader endpoint (Node) discovered through the KRaft RPCs (specially BeginQuorumEpoch request and Fetch response).

This was achieved by changing the RequestManager API to accept Node instead of just the replica ID. Internally, the request manager tracks connection state using the Node.idString method to match the connection management used by NetworkClient.

The API for RequestManager is also changed so that the ConnectState class is not exposed in the API. This allows the request manager to reclaim heap memory for any connection that is ready.

The NetworkChannel was updated to receive the endpoint information (Node) through the outbound raft request (RaftRequent.Outbound). This makes the network channel more flexible as it doesn't need to be configured with the list of all possible endpoints. RaftRequest.Outbound and RaftResponse.Inbound were updated to include the remote node instead of just the remote id.

The follower state tracked by KRaft replicas was updated to include both the leader id and the leader's endpoint (Node). In this comment the node value is computed from the set of voters. In future commit this will be updated so that it is sent through KRaft RPCs. For example BeginQuorumEpoch request and Fetch response.

Support for configuring controller.quorum.bootstrap.servers was added. This includes changes to KafkaConfig, QuorumConfig, etc. All of the tests using QuorumTestHarness were changed to use the controller.quorum.bootstrap.servers instead of the controller.quorum.voters for the broker configuration. Finally, the node id for the bootstrap server will be decreasing negative numbers starting with -2.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@jsancio jsancio marked this pull request as ready for review May 20, 2024 17:07
@@ -20,6 +20,8 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN
# TODO; remove this line
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder about TODO

@@ -118,6 +120,10 @@ class RaftManagerTest {
new Metrics(Time.SYSTEM),
Option.empty,
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
config.quorumBootstrapServers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we seem to have this same little snippet in a few places. Is there somewhere we could add a helper?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -1366,6 +1368,26 @@ class KafkaConfigTest {
assertEquals(expectedVoters, addresses)
}

@Test
def testParseQuorumBootstrapServers(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should have some invalid test cases as well?

Copy link
Member Author

@jsancio jsancio May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. It does look like there are bugs in the implementation of Utils.getHost and Utils.getPort. I don't really want to fix them in this PR. I created this issue: https://issues.apache.org/jira/browse/KAFKA-16824

private Optional<RawSnapshotWriter> fetchingSnapshot;
private Optional<RawSnapshotWriter> fetchingSnapshot = Optional.empty();

// TODO: remove this when done debuggin CI failures
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO reminder


// Attempt to parse the connect strings
for (String entry : entries) {
QuorumConfig.parseBootstrapServer(entry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: drop unnecessary QuorumConfig prefix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

);
}

return InetSocketAddress.createUnresolved(host, port);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to give us a nice exception message if the host is invalid?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. I know that it throws an IllegalArgumentException if host is null or port is outside the valid range. I think it is better for Utils.getHost and Utils.getPort to better catch invalid host and port values so we can throw the corresponding ConfigException. I filed https://issues.apache.org/jira/browse/KAFKA-16824 to improve the implementation of those methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a mistake to resolve hostnames as part of configuration validation. Just treat them as strings.

Otherwise you could end up not being able to start Kafka because 1 out of 3 hostnames in your bootstrap list could not be resolved (because of a DNS problem or whatever)

Resolve a hostname when you're actually using it, not before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I have been fixing any code that resolves host names outside of the NetworkClient. This PR should not be resolving host names outside of the NetworkClient. Let me know if you find a case that violates this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I thought Utils.getHost resolved the hostname. I can see now that it doesn't. Fair enough.

@@ -1234,7 +1286,22 @@ private boolean handleFetchResponse(
} else {
Records records = FetchResponse.recordsOrFail(partitionResponse);
if (records.sizeInBytes() > 0) {
appendAsFollower(records);
// TODO: remove all of this code when done debugging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO reminder

)
.collect(Collectors.toList());

logger.info("Starting request manager with bootstrap servers: {}", bootstrapNodes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we change this message to something like "Staring request manager with static voter set: {}"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return state.isResponseExpected(correlationId);
}

public void onResponseReceived(Node node, long correlationId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could consolidate onResponseReceived and onResponseError. Perhaps something like notifyResponseResult or something like that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Went with onResponseResult(Node, long, boolean, long).

* @param voter the id of the voter
* @param listener the name of the listener
* @return the socket address if it exists, otherwise {@code Optional.empty()}
* @param voterIds the id of the voters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ids of the voters

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@jsancio
Copy link
Member Author

jsancio commented May 27, 2024

@hachikuji, thanks for the review. The PR is ready for another round.

The core of the issue that I fixed was that KRaft was sending two Fetch requests for the same LEO. One of the Fetch request was going to the leader using the Node with an id greater than or equal to 0. The second Fetch request was going to the leader using the Node in the bootstrap server list which had a negative id.

I fixed the issue by updating the RequestManager and KafkaRaftClient to only allow one outstanding (Fetch) request.

@@ -94,6 +95,7 @@ class SharedServer(
val time: Time,
private val _metrics: Metrics,
val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, InetSocketAddress]],
val bootstrapServers: JCollection[InetSocketAddress],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why this needs to be a new argument. We already have KafkaConfig passed in as an argument, and this just comes from QuorumConfig.parseBootstrapServers(config.quorumBootstrapServers).

Adding a new argument here creates a lot of churn, which is a lot of work for merges and such.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same reason why the controllerQuorumVotersFuture is passed as an argument and it is not read from the KafkaConfig. Tests tend to configure the voters using the 0 port so that the OS can dynamically assign an open port. We need the same functionality for bootstrapServers in most tests the bootstrap server are the same se of endpoint as the voters but without the node id.

Having said that I have plans to remove all of these complexity after 3.8. This is mainly here before of kafka raft cluster test kit. I created this issue to track this work: https://issues.apache.org/jira/browse/KAFKA-16653

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

@@ -21,12 +21,12 @@ import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition}
Copy link
Contributor

@cmccabe cmccabe May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to do this in a separate cleanup PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Next time. I can't help but clean up code as I see it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -21,6 +21,5 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN


Copy link
Contributor

@cmccabe cmccabe May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to do this in a separate cleanup PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Next time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

", highWatermark=" + highWatermark +
", fetchingSnapshot=" + fetchingSnapshot +
')';
return String.format(
Copy link
Contributor

@cmccabe cmccabe May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure I see a lot of benefit to this change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String.format are easier to update and read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I find String.format actually harder to read in the "simple" cases where you're just printing x=myX, y=myY. I think where it really shines is where you want an exotic format (like x=%02x, y=%08d, etc.) or where the textual part is complex. However, I don't feel that strongly about this so you can leave it.

@@ -1593,6 +1644,13 @@ private Optional<Boolean> maybeHandleCommonResponse(
int epoch,
long currentTimeMs
) {
Optional<Node> leader = Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be more clearly expressed as something like

Optional<Node> leader = leaderId.isPresent() ? 
  partitionState.lastVoterSet().voterNode(leaderId.getAsInt(), channel.listenerName()) : 
  Optional.empty();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that suggestion much harder to read because the important characters ? and : get lost with all the other characters. I updated the code anyways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

int epoch,
long currentTimeMs
) {
OptionalInt leaderId = OptionalInt.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again would be more clearly expressed with ?: rather than mutating the OptionalInt reference

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

public class RequestManager {
private final Map<Integer, ConnectionState> connections = new HashMap<>();
private final List<Integer> voters = new ArrayList<>();
private final Map<String, ConnectionState> connections = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why you are changing this to treat node ID as a string. An Int seems better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly to match NetworkClient. NetworkClient uses Node::idString as the key instead of Node::id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like a mistake in NetworkClient, and I don't see why we'd want to emulate it here. But it's not a very big mistake in the grand scheme of things

public Optional<InetSocketAddress> voterAddress(int voter, String listener) {
return Optional.ofNullable(voters.get(voter))
.flatMap(voterNode -> voterNode.address(listener));
public Set<Node> voterNodes(Stream<Integer> voterIds, ListenerName listenerName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like the wrong behavior to me for the case where one of the voters doesn't have a specific listener, but the other ones do. Wouldn't we want to continue to use the other, working voters in that case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. I had that as the original implementation but changed it to throw an IllegalArgumentException.

If the user configured the cluster so that the default/preferred listener of one node/voter is not supported by all of the voters, it is clearly misconfigured. Should KRaft to continue silently knowing that the voter can never become leader? Or should Kafka catch the misconfiguration early and fail?

I opted, and in KRaft we generally opt, towards fail fast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess. We have to be a bit careful though, because the quorum topolgy can change over time. If we encounter an old state (that isn't the latest) and that causes us to kill the kcontroller before we can see the latest state, that could be quite bad.

@cmccabe
Copy link
Contributor

cmccabe commented May 29, 2024

Thanks for the PR, @jsancio . A few meta-comments:

  • I'm not sure I see the benefit to changing the toString functions to use String.format. It seems more brittle than the simple string concatenation approach. String.format can be helpful when you want to print something in a specific way, like String.format("%02d", myInt).... but that isn't the case here.

  • Changing the SharedServer constructor is a huge pain and generates a lot of churn. Since the thing you're passing comes from the static configuration anyway, let's not do that. Just read the static configuration in SharedServer (which we do for many other things)

  • I think we should try to avoid doing too much validation in KafkaConfig. For example, things like hostnames should be resolved when we actually need them. It would be silly for one unresolvable hostname to make configuration validation fail, and hence fail the whole kcontroller startup process.

  • I don't think we want to change all of the tests to use controller.quorum.bootstrap.servers. We still need to support the old configuration. Let's make it so that tests using IBP_3_8_IV0 or newer use the new thing, and tests using an older MV use the old configuration. That way we will have good coverage.

return 0;
}

return state.remainingBackoffMs(timeMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: additional space after return.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Looks like I already removed this in another commit.

} else if (connection.isBackoffComplete(currentTimeMs)) {
// Mark the node as ready after completed backoff
iterator.remove();
} else if (connection.hasInflightRequest(currentTimeMs)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we'll also check hasRequestTimeout in hasInflightRequest. We can consider to do a small optimization here in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I think we can re-implement the RequestManager so that all operations consistently update the state as part of checking the state. I think it would be much easier to read and understand. I opted against it in this PR to minimize the diff and the possibility of introducing a breaking change.

@jsancio jsancio changed the title KAFKA-16252; Dynamic KRaft network manager and channel KAFKA-16525; Dynamic KRaft network manager and channel Jun 2, 2024
@jsancio
Copy link
Member Author

jsancio commented Jun 3, 2024

I don't think we want to change all of the tests to use controller.quorum.bootstrap.servers. We still need to support the old configuration. Let's make it so that tests using IBP_3_8_IV0 or newer use the new thing, and tests using an older MV use the old configuration. That way we will have good coverage.

controller.quorum.bootstrap.servers is not gated by metadata version, kraft version or IBP. This property only affects the behavior of the local node and it doesn't affect the network protocol (RPCs) or data version stored on disk. In other words using this configuration should be safe irrespective of the software version of the remote nodes.

I decided to change all of the quorum test harness test to use controller.quorum.bootstrap.servers because this feature is a super set of the controller.quorum.voters feature since the id are negatives. In other words, if tests pass with controller.quorum.bootstrap.servers they should pass with controller.quorum.bootstrap.servers.

Copy link
Contributor

@cmccabe cmccabe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cmccabe
Copy link
Contributor

cmccabe commented Jun 3, 2024

controller.quorum.bootstrap.servers is not gated by metadata version, kraft version or IBP. This property only affects the behavior of the local node and it doesn't affect the network protocol (RPCs) or data version stored on disk. In other words using this configuration should be safe irrespective of the software version of the remote nodes.

I agree with this, but that was not my point. My point was that we should have some tests exercising both code paths.

I do think we need to fix this later, but I don't want to hold up the PR on it.

@cmccabe cmccabe merged commit 459da47 into apache:trunk Jun 3, 2024
1 check failed
jsancio added a commit that referenced this pull request Jun 4, 2024
Allow KRaft replicas to send requests to any node (Node) not just the nodes configured in the
controller.quorum.voters property. This flexibility is needed so KRaft can implement the
controller.quorum.voters configuration, send request to the dynamically changing set of voters and
send request to the leader endpoint (Node) discovered through the KRaft RPCs (specially
BeginQuorumEpoch request and Fetch response).

This was achieved by changing the RequestManager API to accept Node instead of just the replica ID.
Internally, the request manager tracks connection state using the Node.idString method to match the
connection management used by NetworkClient.

The API for RequestManager is also changed so that the ConnectState class is not exposed in the
API. This allows the request manager to reclaim heap memory for any connection that is ready.

The NetworkChannel was updated to receive the endpoint information (Node) through the outbound raft
request (RaftRequent.Outbound). This makes the network channel more flexible as it doesn't need to
be configured with the list of all possible endpoints. RaftRequest.Outbound and
RaftResponse.Inbound were updated to include the remote node instead of just the remote id.

The follower state tracked by KRaft replicas was updated to include both the leader id and the
leader's endpoint (Node). In this comment the node value is computed from the set of voters. In
future commit this will be updated so that it is sent through KRaft RPCs. For example
BeginQuorumEpoch request and Fetch response.

Support for configuring controller.quorum.bootstrap.servers was added. This includes changes to
KafkaConfig, QuorumConfig, etc. All of the tests using QuorumTestHarness were changed to use the
controller.quorum.bootstrap.servers instead of the controller.quorum.voters for the broker
configuration. Finally, the node id for the bootstrap server will be decreasing negative numbers
starting with -2.

Reviewers: Jason Gustafson <jason@confluent.io>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Jun 8, 2024
Allow KRaft replicas to send requests to any node (Node) not just the nodes configured in the
controller.quorum.voters property. This flexibility is needed so KRaft can implement the
controller.quorum.voters configuration, send request to the dynamically changing set of voters and
send request to the leader endpoint (Node) discovered through the KRaft RPCs (specially
BeginQuorumEpoch request and Fetch response).

This was achieved by changing the RequestManager API to accept Node instead of just the replica ID.
Internally, the request manager tracks connection state using the Node.idString method to match the
connection management used by NetworkClient.

The API for RequestManager is also changed so that the ConnectState class is not exposed in the
API. This allows the request manager to reclaim heap memory for any connection that is ready.

The NetworkChannel was updated to receive the endpoint information (Node) through the outbound raft
request (RaftRequent.Outbound). This makes the network channel more flexible as it doesn't need to
be configured with the list of all possible endpoints. RaftRequest.Outbound and
RaftResponse.Inbound were updated to include the remote node instead of just the remote id.

The follower state tracked by KRaft replicas was updated to include both the leader id and the
leader's endpoint (Node). In this comment the node value is computed from the set of voters. In
future commit this will be updated so that it is sent through KRaft RPCs. For example
BeginQuorumEpoch request and Fetch response.

Support for configuring controller.quorum.bootstrap.servers was added. This includes changes to
KafkaConfig, QuorumConfig, etc. All of the tests using QuorumTestHarness were changed to use the
controller.quorum.bootstrap.servers instead of the controller.quorum.voters for the broker
configuration. Finally, the node id for the bootstrap server will be decreasing negative numbers
starting with -2.

Reviewers: Jason Gustafson <jason@confluent.io>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants