Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AlterISR RPC and use it for ISR modifications #9100

Merged
merged 47 commits into from Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
37f7c63
Checking in RPCs and stubbing out basic functionality
mumrah Jul 29, 2020
19dd93a
Update the ISR from the controller
mumrah Jul 30, 2020
5482529
Merge remote-tracking branch 'apache-github/trunk' into KAFKA-8836-al…
mumrah Jul 30, 2020
6fb2551
Start working on broker-side of AlterIsr
mumrah Jul 31, 2020
5955b1d
Add throttle time to AlterIsr, fix error counts
mumrah Aug 5, 2020
afeb18e
Broker to controller AlterIsr fixes
mumrah Aug 5, 2020
627b26a
Add "effective" ISR to Partition
mumrah Aug 5, 2020
fe99911
Merge remote-tracking branch 'apache-github/trunk' into KAFKA-8836-al…
mumrah Aug 7, 2020
07716ce
PartitionTest and ReplicaManagerTest passing
mumrah Aug 12, 2020
10b7d63
PR feedback and progress on unit tests
mumrah Aug 13, 2020
da33368
Merge remote-tracking branch 'apache-github/trunk' into KAFKA-8836-al…
mumrah Aug 14, 2020
1d98ada
PR feedback, minor nits mostly
mumrah Aug 17, 2020
b7577f1
Gate AlterIsr on IBP
mumrah Aug 17, 2020
9c7afa1
Add back ReplicaManager's ISR propagation thread
mumrah Aug 18, 2020
4cc58a3
WIP tests
mumrah Aug 18, 2020
f557a86
Merge remote-tracking branch 'apache-github/trunk' into KAFKA-8836-al…
mumrah Aug 18, 2020
8c9183b
Back something out
mumrah Aug 18, 2020
1e20e9c
Tests WIP
mumrah Aug 18, 2020
93ac630
Add some debug logging for test
mumrah Aug 19, 2020
ccda535
Make sure to always send LeaderAndIsr after AlterIsr
mumrah Aug 19, 2020
19d8e36
More logging for tests
mumrah Aug 20, 2020
2bcf8b4
Clear the pending ISR state for certain errors
mumrah Aug 20, 2020
5be92d0
Let Partition handle LeaderAndIsr even if epoch is unchanged
mumrah Aug 20, 2020
868a779
Better logging for ISR updates, allow leader to retry in some cases
mumrah Aug 21, 2020
69441c0
Clear pending ISR state after unknown controller error
mumrah Aug 21, 2020
d6ba8c2
Feedback from PR
mumrah Aug 21, 2020
c886e8c
Cover more error cases, some cleanup
mumrah Aug 21, 2020
1691951
Revert "Back something out"
mumrah Aug 24, 2020
6b264a4
Cleanup
mumrah Aug 24, 2020
f791645
Merge remote-tracking branch 'apache-github/trunk' into KAFKA-8836-al…
mumrah Sep 9, 2020
a3ab36c
Lots of updates from Jason's review
mumrah Sep 10, 2020
aa15fd0
Fix some Scala 2.12 problems
mumrah Sep 10, 2020
7ef9a0d
More PR feedback, mostly style stuff
mumrah Sep 10, 2020
4e5685c
Make sure we set throttle time on the response
mumrah Sep 11, 2020
02e7378
Use a queue instead of map inside AlterIsrManager
mumrah Sep 12, 2020
31f9126
Trying something different in AlterIsrManager
mumrah Sep 15, 2020
d00c066
Cleanup and adding more unit tests
mumrah Sep 15, 2020
ff4f5b2
Fix unit tests
mumrah Sep 15, 2020
392472e
Merge remote-tracking branch 'apache-github/trunk' into KAFKA-8836-al…
mumrah Sep 15, 2020
4105349
fixup after merge
mumrah Sep 15, 2020
834718e
Partially revert startup ordering
mumrah Sep 16, 2020
47878b0
More PR feedback
mumrah Sep 17, 2020
7a93d06
Merge remote-tracking branch 'apache-github/trunk' into KAFKA-8836-al…
mumrah Sep 17, 2020
1dde0e4
More PR feedback
mumrah Sep 17, 2020
8c694d1
Two small test things
mumrah Sep 17, 2020
1647c71
Merge remote-tracking branch 'apache-github/trunk' into KAFKA-8836-al…
mumrah Sep 23, 2020
ac3eec1
Next round of PR feedback
mumrah Sep 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class InvalidUpdateVersionException extends ApiException {

public InvalidUpdateVersionException(String message) {
super(message);
}

public InvalidUpdateVersionException(String message, Throwable throwable) {
super(message, throwable);
}

}
Expand Up @@ -24,6 +24,8 @@
import org.apache.kafka.common.message.AlterClientQuotasResponseData;
import org.apache.kafka.common.message.AlterConfigsRequestData;
import org.apache.kafka.common.message.AlterConfigsResponseData;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
Expand Down Expand Up @@ -244,7 +246,8 @@ public Struct parseResponse(short version, ByteBuffer buffer) {
END_QUORUM_EPOCH(54, "EndQuorumEpoch", true, false,
EndQuorumEpochRequestData.SCHEMAS, EndQuorumEpochResponseData.SCHEMAS),
DESCRIBE_QUORUM(55, "DescribeQuorum", true, false,
DescribeQuorumRequestData.SCHEMAS, DescribeQuorumResponseData.SCHEMAS);
DescribeQuorumRequestData.SCHEMAS, DescribeQuorumResponseData.SCHEMAS),
ALTER_ISR(56, "AlterIsr", AlterIsrRequestData.SCHEMAS, AlterIsrResponseData.SCHEMAS);

private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.common.errors.EligibleLeadersNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
Expand Down Expand Up @@ -334,7 +335,8 @@ public enum Errors {
DUPLICATE_RESOURCE(92, "A request illegally referred to the same resource twice.", DuplicateResourceException::new),
UNACCEPTABLE_CREDENTIAL(93, "Requested credential would not meet criteria for acceptability.", UnacceptableCredentialException::new),
INCONSISTENT_VOTER_SET(94, "Indicates that the either the sender or recipient of a " +
"voter-only request is not one of the expected voters", InconsistentVoterSetException::new);
"voter-only request is not one of the expected voters", InconsistentVoterSetException::new),
INVALID_UPDATE_VERSION(95, "The given update version was invalid.", InvalidUpdateVersionException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -254,6 +255,8 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Str
return new BeginQuorumEpochRequest(struct, apiVersion);
case END_QUORUM_EPOCH:
return new EndQuorumEpochRequest(struct, apiVersion);
case ALTER_ISR:
return new AlterIsrRequest(new AlterIsrRequestData(struct, apiVersion), apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -199,6 +200,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, shor
return new EndQuorumEpochResponse(struct, version);
case DESCRIBE_QUORUM:
return new DescribeQuorumResponse(struct, version);
case ALTER_ISR:
return new AlterIsrResponse(new AlterIsrResponseData(struct, version));
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;

public class AlterIsrRequest extends AbstractRequest {

private final AlterIsrRequestData data;

public AlterIsrRequest(AlterIsrRequestData data, short apiVersion) {
super(ApiKeys.ALTER_ISR, apiVersion);
this.data = data;
}

public AlterIsrRequestData data() {
return data;
}

@Override
protected Struct toStruct() {
return data.toStruct(version());
}

/**
* Get an error response for a request with specified throttle time in the response if applicable
*/
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new AlterIsrResponse(new AlterIsrResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code()));
}

public static class Builder extends AbstractRequest.Builder<AlterIsrRequest> {

private final AlterIsrRequestData data;

public Builder(AlterIsrRequestData data) {
super(ApiKeys.ALTER_ISR);
this.data = data;
}

@Override
public AlterIsrRequest build(short version) {
return new AlterIsrRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}
}
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;

import java.util.HashMap;
import java.util.Map;

public class AlterIsrResponse extends AbstractResponse {

private final AlterIsrResponseData data;

public AlterIsrResponse(AlterIsrResponseData data) {
this.data = data;
}

public AlterIsrResponseData data() {
return data;
}

@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
updateErrorCounts(counts, Errors.forCode(data.errorCode()));
data.topics().forEach(topicResponse -> topicResponse.partitions().forEach(partitionResponse -> {
updateErrorCounts(counts, Errors.forCode(partitionResponse.errorCode()));
}));
return counts;
}

@Override
protected Struct toStruct(short version) {
return data.toStruct(version);
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
}
42 changes: 42 additions & 0 deletions clients/src/main/resources/common/message/AlterIsrRequest.json
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implie
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 56,
"type": "request",
"name": "AlterIsrRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting broker" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
"about": "The epoch of the requesting broker" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name of the topic to alter ISRs for" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The leader epoch of this partition" },
{ "name": "NewIsr", "type": "[]int32", "versions": "0+",
"about": "The ISR for this partition"},
{ "name": "CurrentIsrVersion", "type": "int32", "versions": "0+",
"about": "The expected version of ISR which is being updated"}
]}
]}
]
}
46 changes: 46 additions & 0 deletions clients/src/main/resources/common/message/AlterIsrResponse.json
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 56,
"type": "response",
"name": "AlterIsrResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name of the topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The partition level error code" },
{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The broker ID of the leader." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The leader epoch." },
{ "name": "Isr", "type": "[]int32", "versions": "0+",
"about": "The in-sync replica IDs." },
{ "name": "CurrentIsrVersion", "type": "int32", "versions": "0+",
"about": "The current ISR version." }
]}
]}
]
}
9 changes: 9 additions & 0 deletions core/src/main/scala/kafka/api/ApiVersion.scala
Expand Up @@ -103,6 +103,8 @@ object ApiVersion {
KAFKA_2_7_IV0,
// Bup Fetch protocol for Raft protocol (KIP-595)
KAFKA_2_7_IV1,
// Introduced AlterIsr (KIP-497)
KAFKA_2_7_IV2
)

// Map keys are the union of the short and full versions
Expand Down Expand Up @@ -370,6 +372,13 @@ case object KAFKA_2_7_IV1 extends DefaultApiVersion {
val id: Int = 29
}

case object KAFKA_2_7_IV2 extends DefaultApiVersion {
val shortVersion: String = "2.7"
val subVersion = "IV2"
val recordVersion = RecordVersion.V2
val id: Int = 30
}

object ApiVersionValidator extends Validator {

override def ensureValid(name: String, value: Any): Unit = {
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/api/LeaderAndIsr.scala
Expand Up @@ -46,6 +46,16 @@ case class LeaderAndIsr(leader: Int,
if (leader == LeaderAndIsr.NoLeader) None else Some(leader)
}

def equalsIgnoreZk(other: LeaderAndIsr): Boolean = {
if (this == other) {
true
} else if (other == null) {
false
} else {
leader == other.leader && leaderEpoch == other.leaderEpoch && isr.equals(other.isr)
}
}

override def toString: String = {
s"LeaderAndIsr(leader=$leader, leaderEpoch=$leaderEpoch, isr=$isr, zkVersion=$zkVersion)"
}
Expand Down