Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka-4763 (Used for triggering test only) #3498

Closed
wants to merge 11 commits into from
21 changes: 18 additions & 3 deletions clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.common;

/**
* Information about a topic-partition.
* Information about a topic-partition. This is used to describe MetadataPartitionInfo.
*/
public class PartitionInfo {

Expand All @@ -26,13 +26,20 @@ public class PartitionInfo {
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;

// Used only by tests
public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]);
}

public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas, Node[] offlineReplicas) {
this.topic = topic;
this.partition = partition;
this.leader = leader;
this.replicas = replicas;
this.inSyncReplicas = inSyncReplicas;
this.offlineReplicas = offlineReplicas;
}

/**
Expand Down Expand Up @@ -71,14 +78,22 @@ public Node[] inSyncReplicas() {
return inSyncReplicas;
}

/**
* The subset of the replicas that are offline
*/
public Node[] offlineReplicas() {
return offlineReplicas;
}

@Override
public String toString() {
return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s)",
return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s, offlineReplicas = %s)",
topic,
partition,
leader == null ? "none" : leader.idString(),
formatNodeIds(replicas),
formatNodeIds(inSyncReplicas));
formatNodeIds(inSyncReplicas),
formatNodeIds(offlineReplicas));
}

/* Extract the node ids from each item in the array and format for display */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* Miscellaneous disk-related IOException occurred when handling a request.
*/
public class KafkaStorageException extends RetriableException {

private static final long serialVersionUID = 1L;

public KafkaStorageException() {
super();
}

public KafkaStorageException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
Expand Down Expand Up @@ -495,7 +496,14 @@ public ApiException build(String message) {
public ApiException build(String message) {
return new OperationNotAttemptedException(message);
}
});
}),
KAFKA_STORAGE_ERROR(56, "Disk error when trying to access log file on the disk.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new KafkaStorageException(message);
}
});

private interface ApiExceptionBuilder {
ApiException build(String message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class Protocol {
"topics that don't exist will be created by the broker. " +
"Otherwise, no topics will be created by the broker."));

/* The v5 metadata request is the same as v4. An additional field for offline_replicas has been added to the v5 metadata response */
public static final Schema METADATA_REQUEST_V5 = METADATA_REQUEST_V4;

public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
new Field("host", STRING, "The hostname of the broker."),
new Field("port", INT32,
Expand Down Expand Up @@ -121,12 +124,38 @@ public class Protocol {

public static final Schema PARTITION_METADATA_V1 = PARTITION_METADATA_V0;

public static final Schema PARTITION_METADATA_V2 = new Schema(new Field("partition_error_code",
INT16,
"The error code for the partition, if any."),
new Field("partition_id",
INT32,
"The id of the partition."),
new Field("leader",
INT32,
"The id of the broker acting as leader for this partition."),
new Field("replicas",
new ArrayOf(INT32),
"The set of all nodes that host this partition."),
new Field("isr",
new ArrayOf(INT32),
"The set of nodes that are in sync with the leader for this partition."),
new Field("offline_replicas",
new ArrayOf(INT32),
"The set of offline replicas of this partition."));

public static final Schema TOPIC_METADATA_V1 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
new Field("topic", STRING, "The name of the topic"),
new Field("is_internal", BOOLEAN,
"Indicates if the topic is considered a Kafka internal topic"),
new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V1),
"Metadata for each partition of the topic."));
"Metadata for each partition of the topic."));

public static final Schema TOPIC_METADATA_V2 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
new Field("topic", STRING, "The name of the topic"),
new Field("is_internal", BOOLEAN,
"Indicates if the topic is considered a Kafka internal topic"),
new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V2),
"Metadata for each partition of the topic."));

public static final Schema METADATA_RESPONSE_V1 = new Schema(new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
"Host and port information for all brokers."),
Expand Down Expand Up @@ -154,8 +183,18 @@ public class Protocol {

public static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3;

public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4};
public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4};
public static final Schema METADATA_RESPONSE_V5 = new Schema(
newThrottleTimeField(),
new Field("brokers", new ArrayOf(METADATA_BROKER_V1),
"Host and port information for all brokers."),
new Field("cluster_id", NULLABLE_STRING,
"The cluster id that this broker belongs to."),
new Field("controller_id", INT32,
"The broker id of the controller broker."),
new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V2)));

public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4, METADATA_REQUEST_V5};
public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4, METADATA_RESPONSE_V5};

/* Produce api */

Expand Down Expand Up @@ -1039,6 +1078,17 @@ public class Protocol {
new Field("zk_version", INT32, "The ZK version."),
new Field("replicas", new ArrayOf(INT32), "The replica ids."));

public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1 =
new Schema(new Field("topic", STRING, "Topic name."),
new Field("partition", INT32, "Topic partition id."),
new Field("controller_epoch", INT32, "The controller epoch."),
new Field("leader", INT32, "The broker id for the leader."),
new Field("leader_epoch", INT32, "The leader epoch."),
new Field("isr", new ArrayOf(INT32), "The in sync replica ids."),
new Field("zk_version", INT32, "The ZK version."),
new Field("replicas", new ArrayOf(INT32), "The replica ids."),
new Field("is_new", BOOLEAN, "Whether the replica should have existed on the broker or not"));

public static final Schema LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0 =
new Schema(new Field("id", INT32, "The broker id."),
new Field("host", STRING, "The hostname of the broker."),
Expand All @@ -1050,6 +1100,12 @@ public class Protocol {
new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0)),
new Field("live_leaders", new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0)));

public static final Schema LEADER_AND_ISR_REQUEST_V1 = new Schema(new Field("controller_id", INT32, "The controller id."),
new Field("controller_epoch", INT32, "The controller epoch."),
new Field("partition_states",
new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1)),
new Field("live_leaders", new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0)));

public static final Schema LEADER_AND_ISR_RESPONSE_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
new Field("partition", INT32, "Topic partition id."),
new Field("error_code", INT16, "Error code."));
Expand All @@ -1058,8 +1114,10 @@ public class Protocol {
new Field("partitions",
new ArrayOf(LEADER_AND_ISR_RESPONSE_PARTITION_V0)));

public static final Schema[] LEADER_AND_ISR_REQUEST = {LEADER_AND_ISR_REQUEST_V0};
public static final Schema[] LEADER_AND_ISR_RESPONSE = {LEADER_AND_ISR_RESPONSE_V0};
public static final Schema LEADER_AND_ISR_RESPONSE_V1 = LEADER_AND_ISR_RESPONSE_V0;

public static final Schema[] LEADER_AND_ISR_REQUEST = {LEADER_AND_ISR_REQUEST_V0, LEADER_AND_ISR_REQUEST_V1};
public static final Schema[] LEADER_AND_ISR_RESPONSE = {LEADER_AND_ISR_RESPONSE_V0, LEADER_AND_ISR_RESPONSE_V1};

/* Replica api */
public static final Schema STOP_REPLICA_REQUEST_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
Expand Down Expand Up @@ -1141,6 +1199,17 @@ public class Protocol {

public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V3 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V2;

public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 =
new Schema(new Field("topic", STRING, "Topic name."),
new Field("partition", INT32, "Topic partition id."),
new Field("controller_epoch", INT32, "The controller epoch."),
new Field("leader", INT32, "The broker id for the leader."),
new Field("leader_epoch", INT32, "The leader epoch."),
new Field("isr", new ArrayOf(INT32), "The in sync replica ids."),
new Field("zk_version", INT32, "The ZK version."),
new Field("replicas", new ArrayOf(INT32), "The replica ids."),
new Field("offline_replicas", new ArrayOf(INT32), "The offline replica ids"));

public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V3 =
new Schema(new Field("port", INT32, "The port on which the broker accepts requests."),
new Field("host", STRING, "The hostname of the broker."),
Expand All @@ -1158,12 +1227,20 @@ public class Protocol {
new Field("partition_states", new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V3)),
new Field("live_brokers", new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V3)));

public static final Schema UPDATE_METADATA_REQUEST_V4 =
new Schema(new Field("controller_id", INT32, "The controller id."),
new Field("controller_epoch", INT32, "The controller epoch."),
new Field("partition_states", new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V4)),
new Field("live_brokers", new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V3)));

public static final Schema UPDATE_METADATA_RESPONSE_V3 = UPDATE_METADATA_RESPONSE_V2;

public static final Schema UPDATE_METADATA_RESPONSE_V4 = UPDATE_METADATA_RESPONSE_V3;

public static final Schema[] UPDATE_METADATA_REQUEST = {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1,
UPDATE_METADATA_REQUEST_V2, UPDATE_METADATA_REQUEST_V3};
UPDATE_METADATA_REQUEST_V2, UPDATE_METADATA_REQUEST_V3, UPDATE_METADATA_REQUEST_V4};
public static final Schema[] UPDATE_METADATA_RESPONSE = {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1,
UPDATE_METADATA_RESPONSE_V2, UPDATE_METADATA_RESPONSE_V3};
UPDATE_METADATA_RESPONSE_V2, UPDATE_METADATA_RESPONSE_V3, UPDATE_METADATA_RESPONSE_V4};

/* SASL handshake api */
public static final Schema SASL_HANDSHAKE_REQUEST_V0 = new Schema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ public void close() throws IOException {
channel.close();
}

/**
* Close file handlers used by the FileChannel but don't write to disk. This is used when the disk may have failed
*/
public void closeHandlers() throws IOException {
channel.close();
}

/**
* Delete this message set from the filesystem
* @return True iff this message set was deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
private static final String ISR_KEY_NAME = "isr";
private static final String ZK_VERSION_KEY_NAME = "zk_version";
private static final String REPLICAS_KEY_NAME = "replicas";
private static final String IS_NEW_KEY_NAME = "is_new";

// live_leaders key names
private static final String END_POINT_ID_KEY_NAME = "id";
Expand All @@ -57,9 +58,9 @@ public static class Builder extends AbstractRequest.Builder<LeaderAndIsrRequest>
private final Map<TopicPartition, PartitionState> partitionStates;
private final Set<Node> liveLeaders;

public Builder(int controllerId, int controllerEpoch,
public Builder(short version, int controllerId, int controllerEpoch,
Map<TopicPartition, PartitionState> partitionStates, Set<Node> liveLeaders) {
super(ApiKeys.LEADER_AND_ISR);
super(ApiKeys.LEADER_AND_ISR, version);
this.controllerId = controllerId;
this.controllerEpoch = controllerEpoch;
this.partitionStates = partitionStates;
Expand Down Expand Up @@ -121,10 +122,10 @@ public LeaderAndIsrRequest(Struct struct, short version) {
List<Integer> replicas = new ArrayList<>(replicasArray.length);
for (Object r : replicasArray)
replicas.add((Integer) r);
boolean isNew = partitionStateData.hasField(IS_NEW_KEY_NAME) ? partitionStateData.getBoolean(IS_NEW_KEY_NAME) : false;

PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas, isNew);
partitionStates.put(new TopicPartition(topic, partition), partitionState);

}

Set<Node> leaders = new HashSet<>();
Expand Down Expand Up @@ -162,6 +163,8 @@ protected Struct toStruct() {
partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion);
partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray());
if (partitionStateData.hasField(IS_NEW_KEY_NAME))
partitionStateData.set(IS_NEW_KEY_NAME, partitionState.isNew);
partitionStatesData.add(partitionStateData);
}
struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
Expand All @@ -188,6 +191,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
switch (versionId) {
case 0:
case 1:
return new LeaderAndIsrResponse(Errors.NONE, responses);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
Expand Down