diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index cff6e30bc93a..08ab9fbbc508 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -42,6 +43,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -1706,8 +1708,20 @@ private void maybeUpdateAssignment(SubscriptionState subscription) { if (!newAssignedPartitions.contains(tp)) { metrics.removeSensor(partitionLagMetricName(tp)); metrics.removeSensor(partitionLeadMetricName(tp)); + metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp)); } } + + for (TopicPartition tp : newAssignedPartitions) { + if (!this.assignedPartitions.contains(tp)) { + MetricName metricName = partitionPreferredReadReplicaMetricName(tp); + if (metrics.metric(metricName) == null) { + metrics.addMetric(metricName, (Gauge) (config, now) -> + subscription.preferredReadReplica(tp, 0L).orElse(-1)); + } + } + } + this.assignedPartitions = newAssignedPartitions; this.assignmentId = newAssignmentId; } @@ -1719,9 +1733,7 @@ private void recordPartitionLead(TopicPartition tp, long lead) { String name = partitionLeadMetricName(tp); Sensor recordsLead = this.metrics.getSensor(name); if (recordsLead == null) { - Map metricTags = new HashMap<>(2); - metricTags.put("topic", tp.topic().replace('.', '_')); - metricTags.put("partition", String.valueOf(tp.partition())); + Map metricTags = topicPartitionTags(tp); recordsLead = this.metrics.sensor(name); @@ -1738,10 +1750,7 @@ private void recordPartitionLag(TopicPartition tp, long lag) { String name = partitionLagMetricName(tp); Sensor recordsLag = this.metrics.getSensor(name); if (recordsLag == null) { - Map metricTags = new HashMap<>(2); - metricTags.put("topic", tp.topic().replace('.', '_')); - metricTags.put("partition", String.valueOf(tp.partition())); - + Map metricTags = topicPartitionTags(tp); recordsLag = this.metrics.sensor(name); recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value()); @@ -1759,6 +1768,17 @@ private static String partitionLeadMetricName(TopicPartition tp) { return tp + ".records-lead"; } + private MetricName partitionPreferredReadReplicaMetricName(TopicPartition tp) { + Map metricTags = topicPartitionTags(tp); + return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags); + } + + private Map topicPartitionTags(TopicPartition tp) { + Map metricTags = new HashMap<>(2); + metricTags.put("topic", tp.topic().replace('.', '_')); + metricTags.put("partition", String.valueOf(tp.partition())); + return metricTags; + } } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java index f86961545cc9..501ffe9a88da 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java @@ -54,6 +54,7 @@ public class FetcherMetricsRegistry { public MetricNameTemplate partitionRecordsLead; public MetricNameTemplate partitionRecordsLeadMin; public MetricNameTemplate partitionRecordsLeadAvg; + public MetricNameTemplate partitionPreferredReadReplica; public FetcherMetricsRegistry() { this(new HashSet(), ""); @@ -139,7 +140,9 @@ public FetcherMetricsRegistry(Set tags, String metricGrpPrefix) { "The min lead of the partition", partitionTags); this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName, "The average lead of the partition", partitionTags); - + this.partitionPreferredReadReplica = new MetricNameTemplate( + "preferred-read-replica", "consumer-fetch-manager-metrics", + "The current read replica for the partition, or -1 if reading from leader", partitionTags); } public List getAllTemplates() { @@ -171,7 +174,8 @@ public List getAllTemplates() { partitionRecordsLagMax, partitionRecordsLead, partitionRecordsLeadMin, - partitionRecordsLeadAvg + partitionRecordsLeadAvg, + partitionPreferredReadReplica ); } diff --git a/clients/src/main/java/org/apache/kafka/common/replica/ClientMetadata.java b/clients/src/main/java/org/apache/kafka/common/replica/ClientMetadata.java new file mode 100644 index 000000000000..b328733dc7b3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/replica/ClientMetadata.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.replica; + +import org.apache.kafka.common.security.auth.KafkaPrincipal; + +import java.net.InetAddress; +import java.util.Objects; + +/** + * Holder for all the client metadata required to determine a preferred replica. + */ +public interface ClientMetadata { + + /** + * Rack ID sent by the client + */ + String rackId(); + + /** + * Client ID sent by the client + */ + String clientId(); + + /** + * Incoming address of the client + */ + InetAddress clientAddress(); + + /** + * Security principal of the client + */ + KafkaPrincipal principal(); + + /** + * Listener name for the client + */ + String listenerName(); + + + class DefaultClientMetadata implements ClientMetadata { + private final String rackId; + private final String clientId; + private final InetAddress clientAddress; + private final KafkaPrincipal principal; + private final String listenerName; + + public DefaultClientMetadata(String rackId, String clientId, InetAddress clientAddress, + KafkaPrincipal principal, String listenerName) { + this.rackId = rackId; + this.clientId = clientId; + this.clientAddress = clientAddress; + this.principal = principal; + this.listenerName = listenerName; + } + + @Override + public String rackId() { + return rackId; + } + + @Override + public String clientId() { + return clientId; + } + + @Override + public InetAddress clientAddress() { + return clientAddress; + } + + @Override + public KafkaPrincipal principal() { + return principal; + } + + @Override + public String listenerName() { + return listenerName; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DefaultClientMetadata that = (DefaultClientMetadata) o; + return Objects.equals(rackId, that.rackId) && + Objects.equals(clientId, that.clientId) && + Objects.equals(clientAddress, that.clientAddress) && + Objects.equals(principal, that.principal) && + Objects.equals(listenerName, that.listenerName); + } + + @Override + public int hashCode() { + return Objects.hash(rackId, clientId, clientAddress, principal, listenerName); + } + + @Override + public String toString() { + return "DefaultClientMetadata{" + + "rackId='" + rackId + '\'' + + ", clientId='" + clientId + '\'' + + ", clientAddress=" + clientAddress + + ", principal=" + principal + + ", listenerName='" + listenerName + '\'' + + '}'; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/replica/PartitionView.java b/clients/src/main/java/org/apache/kafka/common/replica/PartitionView.java new file mode 100644 index 000000000000..8174e631305a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/replica/PartitionView.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.replica; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; + +/** + * View of a partition used by {@link ReplicaSelector} to determine a preferred replica. + */ +public interface PartitionView { + Set replicas(); + + ReplicaView leader(); + + class DefaultPartitionView implements PartitionView { + private final Set replicas; + private final ReplicaView leader; + + public DefaultPartitionView(Set replicas, ReplicaView leader) { + this.replicas = Collections.unmodifiableSet(replicas); + this.leader = leader; + } + + @Override + public Set replicas() { + return replicas; + } + + @Override + public ReplicaView leader() { + return leader; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DefaultPartitionView that = (DefaultPartitionView) o; + return Objects.equals(replicas, that.replicas) && + Objects.equals(leader, that.leader); + } + + @Override + public int hashCode() { + return Objects.hash(replicas, leader); + } + + @Override + public String toString() { + return "DefaultPartitionView{" + + "replicas=" + replicas + + ", leader=" + leader + + '}'; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/replica/RackAwareReplicaSelector.java b/clients/src/main/java/org/apache/kafka/common/replica/RackAwareReplicaSelector.java new file mode 100644 index 000000000000..8ae68723af1d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/replica/RackAwareReplicaSelector.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.replica; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Returns a replica whose rack id is equal to the rack id specified in the client request metadata. If no such replica + * is found, returns the leader. + */ +public class RackAwareReplicaSelector implements ReplicaSelector { + + @Override + public Optional select(TopicPartition topicPartition, + ClientMetadata clientMetadata, + PartitionView partitionView) { + if (clientMetadata.rackId() != null && !clientMetadata.rackId().isEmpty()) { + Set sameRackReplicas = partitionView.replicas().stream() + .filter(replicaInfo -> clientMetadata.rackId().equals(replicaInfo.endpoint().rack())) + .collect(Collectors.toSet()); + if (sameRackReplicas.isEmpty()) { + return Optional.of(partitionView.leader()); + } else { + if (sameRackReplicas.contains(partitionView.leader())) { + // Use the leader if it's in this rack + return Optional.of(partitionView.leader()); + } else { + // Otherwise, get the most caught-up replica + return sameRackReplicas.stream().max(ReplicaView.comparator()); + } + } + } else { + return Optional.of(partitionView.leader()); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/replica/ReplicaSelector.java b/clients/src/main/java/org/apache/kafka/common/replica/ReplicaSelector.java new file mode 100644 index 000000000000..301fc9fdc4b3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/replica/ReplicaSelector.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.replica; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.TopicPartition; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; + +/** + * Plug-able interface for selecting a preferred read replica given the current set of replicas for a partition + * and metadata from the client. + */ +public interface ReplicaSelector extends Configurable, Closeable { + + /** + * Select the preferred replica a client should use for fetching. If no replica is available, this will return an + * empty optional. + */ + Optional select(TopicPartition topicPartition, + ClientMetadata clientMetadata, + PartitionView partitionView); + @Override + default void close() throws IOException { + // No-op by default + } + + @Override + default void configure(Map configs) { + // No-op by default + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/replica/ReplicaView.java b/clients/src/main/java/org/apache/kafka/common/replica/ReplicaView.java new file mode 100644 index 000000000000..69c6cf7fef97 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/replica/ReplicaView.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.replica; + +import org.apache.kafka.common.Node; + +import java.util.Comparator; +import java.util.Objects; + +/** + * View of a replica used by {@link ReplicaSelector} to determine a preferred replica. + */ +public interface ReplicaView { + + /** + * The endpoint information for this replica (hostname, port, rack, etc) + */ + Node endpoint(); + + /** + * The log end offset for this replica + */ + long logEndOffset(); + + /** + * The number of milliseconds (if any) since the last time this replica was caught up to the high watermark. + * For a leader replica, this is always zero. + */ + long timeSinceLastCaughtUpMs(); + + /** + * Comparator for ReplicaView that returns in the order of "most caught up". This is used for deterministic + * selection of a replica when there is a tie from a selector. + */ + static Comparator comparator() { + return Comparator.comparingLong(ReplicaView::logEndOffset) + .thenComparing(Comparator.comparingLong(ReplicaView::timeSinceLastCaughtUpMs).reversed()) + .thenComparing(replicaInfo -> replicaInfo.endpoint().id()); + } + + class DefaultReplicaView implements ReplicaView { + private final Node endpoint; + private final long logEndOffset; + private final long timeSinceLastCaughtUpMs; + + public DefaultReplicaView(Node endpoint, long logEndOffset, long timeSinceLastCaughtUpMs) { + this.endpoint = endpoint; + this.logEndOffset = logEndOffset; + this.timeSinceLastCaughtUpMs = timeSinceLastCaughtUpMs; + } + + @Override + public Node endpoint() { + return endpoint; + } + + @Override + public long logEndOffset() { + return logEndOffset; + } + + @Override + public long timeSinceLastCaughtUpMs() { + return timeSinceLastCaughtUpMs; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DefaultReplicaView that = (DefaultReplicaView) o; + return logEndOffset == that.logEndOffset && + Objects.equals(endpoint, that.endpoint) && + Objects.equals(timeSinceLastCaughtUpMs, that.timeSinceLastCaughtUpMs); + } + + @Override + public int hashCode() { + return Objects.hash(endpoint, logEndOffset, timeSinceLastCaughtUpMs); + } + + @Override + public String toString() { + return "DefaultReplicaView{" + + "endpoint=" + endpoint + + ", logEndOffset=" + logEndOffset + + ", timeSinceLastCaughtUpMs=" + timeSinceLastCaughtUpMs + + '}'; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index da09df3eb900..2c0455ae4ae1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -195,6 +195,7 @@ public class FetchRequest extends AbstractRequest { // V10 bumped up to indicate ZStandard capability. (see KIP-110) private static final Schema FETCH_REQUEST_V10 = FETCH_REQUEST_V9; + // V11 added rack ID to support read from followers (KIP-392) private static final Schema FETCH_REQUEST_V11 = new Schema( REPLICA_ID, MAX_WAIT_TIME, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 942b0d6273ed..d387e8199966 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -144,6 +144,7 @@ public class FetchResponse extends AbstractResponse { LOG_START_OFFSET, new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4))); + // Introduced in V11 to support read from followers (KIP-392) private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V6 = new Schema( PARTITION_ID, ERROR_CODE, @@ -207,6 +208,7 @@ public class FetchResponse extends AbstractResponse { // V10 bumped up to indicate ZStandard capability. (see KIP-110) private static final Schema FETCH_RESPONSE_V10 = FETCH_RESPONSE_V9; + // V11 added preferred read replica for each partition response to support read from followers (KIP-392) private static final Schema FETCH_RESPONSE_V11 = new Schema( THROTTLE_TIME_MS, ERROR_CODE, @@ -329,7 +331,7 @@ public int hashCode() { result = 31 * result + Long.hashCode(highWatermark); result = 31 * result + Long.hashCode(lastStableOffset); result = 31 * result + Long.hashCode(logStartOffset); - result = 31 * result + (preferredReadReplica != null ? preferredReadReplica.hashCode() : 0); + result = 31 * result + Objects.hashCode(preferredReadReplica); result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0); result = 31 * result + (records != null ? records.hashCode() : 0); return result; diff --git a/clients/src/test/java/org/apache/kafka/common/replica/ReplicaSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/replica/ReplicaSelectorTest.java new file mode 100644 index 000000000000..da03e5787ddd --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/replica/ReplicaSelectorTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.replica; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.junit.Test; + +import java.net.InetAddress; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.kafka.test.TestUtils.assertOptional; +import static org.junit.Assert.assertEquals; + +public class ReplicaSelectorTest { + + @Test + public void testSameRackSelector() { + TopicPartition tp = new TopicPartition("test", 0); + + List replicaViewSet = replicaInfoSet(); + ReplicaView leader = replicaViewSet.get(0); + PartitionView partitionView = partitionInfo(new HashSet<>(replicaViewSet), leader); + + ReplicaSelector selector = new RackAwareReplicaSelector(); + Optional selected = selector.select(tp, metadata("rack-b"), partitionView); + assertOptional(selected, replicaInfo -> { + assertEquals("Expect replica to be in rack-b", replicaInfo.endpoint().rack(), "rack-b"); + assertEquals("Expected replica 3 since it is more caught-up", replicaInfo.endpoint().id(), 3); + }); + + selected = selector.select(tp, metadata("not-a-rack"), partitionView); + assertOptional(selected, replicaInfo -> { + assertEquals("Expect leader when we can't find any nodes in given rack", replicaInfo, leader); + }); + + selected = selector.select(tp, metadata("rack-a"), partitionView); + assertOptional(selected, replicaInfo -> { + assertEquals("Expect replica to be in rack-a", replicaInfo.endpoint().rack(), "rack-a"); + assertEquals("Expect the leader since it's in rack-a", replicaInfo, leader); + }); + + + } + + static List replicaInfoSet() { + return Stream.of( + replicaInfo(new Node(0, "host0", 1234, "rack-a"), 4, 0), + replicaInfo(new Node(1, "host1", 1234, "rack-a"), 2, 5), + replicaInfo(new Node(2, "host2", 1234, "rack-b"), 3, 3), + replicaInfo(new Node(3, "host3", 1234, "rack-b"), 4, 2) + + ).collect(Collectors.toList()); + } + + static ReplicaView replicaInfo(Node node, long logOffset, long timeSinceLastCaughtUpMs) { + return new ReplicaView.DefaultReplicaView(node, logOffset, timeSinceLastCaughtUpMs); + } + + static PartitionView partitionInfo(Set replicaViewSet, ReplicaView leader) { + return new PartitionView.DefaultPartitionView(replicaViewSet, leader); + } + + static ClientMetadata metadata(String rack) { + return new ClientMetadata.DefaultClientMetadata(rack, "test-client", + InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, "TEST"); + } +} diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9ffce13356fc..1965d7f823dd 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -519,16 +519,18 @@ class Partition(val topicPartition: TopicPartition, if (isNewLeader) { // construct the high watermark metadata for the new leader replica - leaderLog.maybeFetchHighWatermarkOffsetMetadata() + leaderLog.initializeHighWatermarkOffsetMetadata() // mark local replica as the leader after converting hw leaderReplicaIdOpt = Some(localBrokerId) // reset log end offset for remote replicas - remoteReplicas.foreach { _.updateFetchState( + remoteReplicas.foreach { replica => + replica.updateFetchState( followerFetchOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata, followerStartOffset = Log.UnknownOffset, followerFetchTimeMs = 0L, leaderEndOffset = Log.UnknownOffset ) + replica.updateLastSentHighWatermark(0L) } } // we may need to increment high watermark since ISR could be down to 1 diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 1c61fad44367..5504db586947 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -42,6 +42,10 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log // the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition. @volatile private[this] var _lastCaughtUpTimeMs = 0L + // highWatermark is the leader's high watermark after the most recent FetchRequest from this follower. This is + // used to determine the maximum HW this follower knows about. See KIP-392 + @volatile private[this] var _lastSentHighWatermark = 0L + def logStartOffset: Long = _logStartOffset def logEndOffsetMetadata: LogOffsetMetadata = _logEndOffsetMetadata @@ -50,6 +54,8 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs + def lastSentHighWatermark: Long = _lastSentHighWatermark + /* * If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received, * set `lastCaughtUpTimeMs` to the time when the current fetch request was received. @@ -78,6 +84,19 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log trace(s"Updated state of replica to $this") } + /** + * Update the high watermark of this remote replica. This is used to track what we think is the last known HW to + * a remote follower. Since this is recorded when we send a response, there is no way to guarantee that the follower + * actually receives this HW. So we consider this to be an upper bound on what the follower knows. + * + * When handling fetches, the last sent high watermark for a replica is checked to see if we should return immediately + * in order to propagate the HW more expeditiously. See KIP-392 + */ + def updateLastSentHighWatermark(highWatermark: Long): Unit = { + _lastSentHighWatermark = highWatermark + trace(s"Updated HW of replica to $highWatermark") + } + def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long): Unit = { lastFetchLeaderLogEndOffset = curLeaderLogEndOffset lastFetchTimeMs = curTimeMs @@ -96,6 +115,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log replicaString.append(s", logEndOffsetMetadata=$logEndOffsetMetadata") replicaString.append(s", lastFetchLeaderLogEndOffset=$lastFetchLeaderLogEndOffset") replicaString.append(s", lastFetchTimeMs=$lastFetchTimeMs") + replicaString.append(s", lastSentHighWatermark=$lastSentHighWatermark") replicaString.append(")") replicaString.toString } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b76185e9e89f..31412bd9ebc1 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -328,12 +328,14 @@ class Log(@volatile var dir: File, * Convert hw to local offset metadata by reading the log at the hw offset. * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata. */ - def maybeFetchHighWatermarkOffsetMetadata(): Unit = { + def initializeHighWatermarkOffsetMetadata(): Unit = { if (highWatermarkMetadata.messageOffsetOnly) { - highWatermarkMetadata = convertToOffsetMetadata(highWatermark).getOrElse { - convertToOffsetMetadata(logStartOffset).getOrElse { - val firstSegmentOffset = logSegments.head.baseOffset - LogOffsetMetadata(firstSegmentOffset, firstSegmentOffset, 0) + lock.synchronized { + highWatermarkMetadata = convertToOffsetMetadata(highWatermark).getOrElse { + convertToOffsetMetadata(logStartOffset).getOrElse { + val firstSegmentOffset = logSegments.head.baseOffset + LogOffsetMetadata(firstSegmentOffset, firstSegmentOffset, 0) + } } } } @@ -357,12 +359,40 @@ class Log(@volatile var dir: File, def lastStableOffsetLag: Long = highWatermark - lastStableOffset + /** + * Fully materialize and return an offset snapshot including segment position info. This method will update + * the LogOffsetMetadata for the high watermark and last stable offset if they are message-only. Throws an + * offset out of range error if the segment info cannot be loaded. + */ def offsetSnapshot: LogOffsetSnapshot = { + var highWatermark = _highWatermarkMetadata + if (highWatermark.messageOffsetOnly) { + lock.synchronized { + val fullOffset = convertToOffsetMetadataOrThrow(_highWatermarkMetadata.messageOffset) + _highWatermarkMetadata = fullOffset + highWatermark = _highWatermarkMetadata + } + } + + var lastStable: LogOffsetMetadata = lastStableOffsetMetadata + if (lastStable.messageOffsetOnly) { + lock synchronized { + firstUnstableOffset match { + case None => highWatermark + case Some(offsetMetadata) => + val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset) + firstUnstableOffset = Some(fullOffset) + lastStable = fullOffset + } + } + } + LogOffsetSnapshot( - logStartOffset = logStartOffset, - logEndOffset = logEndOffsetMetadata, - highWatermark = highWatermarkMetadata, - lastStableOffset = lastStableOffsetMetadata) + logStartOffset, + logEndOffsetMetadata, + highWatermark, + lastStable + ) } private val tags = { @@ -1534,17 +1564,25 @@ class Log(@volatile var dir: File, */ def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = { try { - val fetchDataInfo = read(offset, - maxLength = 1, - maxOffset = None, - minOneMessage = false, - includeAbortedTxns = false) - Some(fetchDataInfo.fetchOffsetMetadata) + Some(convertToOffsetMetadataOrThrow(offset)) } catch { case _: OffsetOutOfRangeException => None } } + /** + * Given a message offset, find its corresponding offset metadata in the log. + * If the message offset is out of range, throw an OffsetOutOfRangeException + */ + def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + val fetchDataInfo = read(offset, + maxLength = 1, + maxOffset = None, + minOneMessage = false, + includeAbortedTxns = false) + fetchDataInfo.fetchOffsetMetadata + } + /** * Delete any log segments matching the given predicate function, * starting with the oldest segment and moving forward until a segment doesn't match. diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 902009917592..c3dbde9a0d88 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors._ +import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.requests.FetchRequest.PartitionData import scala.collection._ @@ -59,6 +60,7 @@ class DelayedFetch(delayMs: Long, fetchMetadata: FetchMetadata, replicaManager: ReplicaManager, quota: ReplicaQuota, + clientMetadata: Option[ClientMetadata], responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) extends DelayedOperation(delayMs) { @@ -71,7 +73,7 @@ class DelayedFetch(delayMs: Long, * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes * Case E: The partition is in an offline log directory on this broker * Case F: This broker is the leader, but the requested epoch is now fenced - * + * Case G: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392) * Upon completion, should return whatever data is available for each valid partition */ override def tryComplete(): Boolean = { @@ -114,6 +116,14 @@ class DelayedFetch(delayMs: Long, accumulatedSize += bytesAvailable } } + + if (fetchMetadata.isFromFollower) { + // Case G check if the follower has the latest HW from the leader + if (partition.getReplica(fetchMetadata.replicaId) + .exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) { + return forceComplete() + } + } } } catch { case _: KafkaStorageException => // Case E @@ -157,11 +167,12 @@ class DelayedFetch(delayMs: Long, fetchMaxBytes = fetchMetadata.fetchMaxBytes, hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit, readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo }, + clientMetadata = clientMetadata, quota = quota) val fetchPartitionData = logReadResults.map { case (tp, result) => tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, - result.lastStableOffset, result.info.abortedTransactions) + result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica) } responseCallback(fetchPartitionData) diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala index fe973460615f..aa482eca7b66 100644 --- a/core/src/main/scala/kafka/server/FetchSession.scala +++ b/core/src/main/scala/kafka/server/FetchSession.scala @@ -144,6 +144,10 @@ class CachedPartition(val topic: String, if (updateResponseData) localLogStartOffset = respData.logStartOffset } + if (respData.preferredReadReplica.isPresent) { + // If the broker computed a preferred read replica, we need to include it in the response + mustRespond = true + } if (respData.error.code != 0) { // Partitions with errors are always included in the response. // We also set the cached highWatermark to an invalid offset, -1. diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7ce7d9943ac0..698e5a96a11d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -69,6 +69,8 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ +import org.apache.kafka.common.replica.ClientMetadata +import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse} import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo @@ -82,11 +84,13 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{Node, TopicPartition} +import scala.compat.java8.OptionConverters._ import scala.collection.JavaConverters._ import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} + /** * Logic to handle the various Kafka requests */ @@ -577,6 +581,18 @@ class KafkaApis(val requestChannel: RequestChannel, fetchRequest.toForget, fetchRequest.isFromFollower) + val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) { + // Fetch API version 11 added preferred replica logic + Some(new DefaultClientMetadata( + fetchRequest.rackId, + clientId, + request.context.clientAddress, + request.context.principal, + request.context.listenerName.value)) + } else { + None + } + def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = { new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) @@ -650,7 +666,8 @@ class KafkaApis(val requestChannel: RequestChannel, // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the // client. new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark, - partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions, + partitionData.lastStableOffset, partitionData.logStartOffset, + partitionData.preferredReadReplica, partitionData.abortedTransactions, new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)) } catch { case e: UnsupportedCompressionTypeException => @@ -659,7 +676,8 @@ class KafkaApis(val requestChannel: RequestChannel, } } case None => new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark, - partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions, + partitionData.lastStableOffset, partitionData.logStartOffset, + partitionData.preferredReadReplica, partitionData.abortedTransactions, unconvertedRecords) } } @@ -672,7 +690,8 @@ class KafkaApis(val requestChannel: RequestChannel, val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) partitions.put(tp, new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset, - data.logStartOffset, abortedTransactions, data.records)) + data.logStartOffset, data.preferredReadReplica.map(int2Integer).asJava, + abortedTransactions, data.records)) } erroneous.foreach { case (tp, data) => partitions.put(tp, data) } @@ -769,7 +788,8 @@ class KafkaApis(val requestChannel: RequestChannel, interesting, replicationQuota(fetchRequest), processResponseCallback, - fetchRequest.isolationLevel) + fetchRequest.isolationLevel, + clientMetadata) } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 5123771ca2e9..4765031e1146 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -370,6 +370,7 @@ object KafkaConfig { val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol" val InterBrokerProtocolVersionProp = "inter.broker.protocol.version" val InterBrokerListenerNameProp = "inter.broker.listener.name" + val ReplicaSelectorClassProp = "replica.selector.class" /** ********* Controlled shutdown configuration ***********/ val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries" val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms" @@ -699,6 +700,7 @@ object KafkaConfig { " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list." val InterBrokerListenerNameDoc = s"Name of listener used for communication between brokers. If this is unset, the listener name is defined by $InterBrokerSecurityProtocolProp. " + s"It is an error to set this and $InterBrokerSecurityProtocolProp properties at the same time." + val ReplicaSelectorClassDoc = "The fully qualified class name that implements ReplicaSelector. This is used by the broker to find the preferred read replica. By default, we use an implementation that returns the leader." /** ********* Controlled shutdown configuration ***********/ val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens" val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying." @@ -966,6 +968,7 @@ object KafkaConfig { .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc) .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, ApiVersionValidator, MEDIUM, InterBrokerProtocolVersionDoc) .define(InterBrokerListenerNameProp, STRING, null, MEDIUM, InterBrokerListenerNameDoc) + .define(ReplicaSelectorClassProp, STRING, null, MEDIUM, ReplicaSelectorClassDoc) /** ********* Controlled shutdown configuration ***********/ .define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc) @@ -1186,6 +1189,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO /***************** rack configuration **************/ val rack = Option(getString(KafkaConfig.RackProp)) + val replicaSelectorClassName = Option(getString(KafkaConfig.ReplicaSelectorClassProp)) /** ********* Log Configuration ***********/ val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 2ec84f2854a0..8b8e159be988 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -33,6 +33,7 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} + /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. @@ -195,6 +196,24 @@ class MetadataCache(brokerId: Int) extends Logging { } } + def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = { + val snapshot = metadataSnapshot + snapshot.partitionStates.get(tp.topic()).flatMap(_.get(tp.partition())).map { partitionInfo => + val replicaIds = partitionInfo.basePartitionState.replicas + replicaIds.asScala + .map(replicaId => replicaId.intValue() -> { + snapshot.aliveBrokers.get(replicaId.longValue()) match { + case Some(broker) => + broker.getNode(listenerName).getOrElse(Node.noNode()) + case None => + Node.noNode() + }}).toMap + .filter(pair => pair match { + case (_, node) => !node.isEmpty + }) + }.getOrElse(Map.empty[Int, Node]) + } + def getControllerId: Option[Int] = metadataSnapshot.controllerId def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = { diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 18f89f742396..46e90fc8eba9 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -22,7 +22,7 @@ import java.util.Optional import kafka.api.Request import kafka.cluster.BrokerEndPoint -import kafka.log.{LogAppendInfo, LogOffsetSnapshot} +import kafka.log.LogAppendInfo import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.server.QuotaFactory.UnboundedQuota import org.apache.kafka.common.TopicPartition @@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.Records import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.FetchResponse.PartitionData -import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse} +import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata} import scala.collection.JavaConverters._ import scala.collection.{Map, Seq, Set, mutable} @@ -89,7 +89,8 @@ class ReplicaAlterLogDirsThread(name: String, request.fetchData.asScala.toSeq, UnboundedQuota, processResponseCallback, - request.isolationLevel) + request.isolationLevel, + None) if (partitionData == null) throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}") @@ -121,18 +122,13 @@ class ReplicaAlterLogDirsThread(name: String, } override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = { - val offsetSnapshot = offsetSnapshotFromCurrentReplica(topicPartition, leaderEpoch) - offsetSnapshot.logStartOffset + val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false) + partition.localLogOrException.logStartOffset } override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = { - val offsetSnapshot = offsetSnapshotFromCurrentReplica(topicPartition, leaderEpoch) - offsetSnapshot.logEndOffset.messageOffset - } - - private def offsetSnapshotFromCurrentReplica(topicPartition: TopicPartition, leaderEpoch: Int): LogOffsetSnapshot = { val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false) - partition.fetchOffsetSnapshot(Optional.of[Integer](leaderEpoch), fetchOnlyFromLeader = false) + partition.localLogOrException.logEndOffset } /** diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 07b092822c47..bfb50b88a7da 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -34,12 +34,15 @@ import kafka.utils._ import kafka.zk.KafkaZkClient import org.apache.kafka.common.ElectionType import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.Node import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ +import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo} import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.FetchRequest.PartitionData @@ -47,7 +50,10 @@ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.{ApiError, DeleteRecordsResponse, DescribeLogDirsResponse, EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, LeaderAndIsrResponse, OffsetsForLeaderEpochRequest, StopReplicaRequest, UpdateMetadataRequest} import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView +import org.apache.kafka.common.replica.{ClientMetadata, _} +import scala.compat.java8.OptionConverters._ import scala.collection.JavaConverters._ import scala.collection.{Map, Seq, Set, mutable} @@ -75,7 +81,8 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc * @param readSize amount of data that was read from the log i.e. size of the fetch * @param isReadFromLogEnd true if the request read up to the log end offset snapshot * when the read was initiated, false otherwise - * @param error Exception if error encountered while reading from the log + * @param preferredReadReplica the preferred read replica to be used for future fetches + * @param exception Exception if error encountered while reading from the log */ case class LogReadResult(info: FetchDataInfo, highWatermark: Long, @@ -85,6 +92,8 @@ case class LogReadResult(info: FetchDataInfo, fetchTimeMs: Long, readSize: Int, lastStableOffset: Option[Long], + preferredReadReplica: Option[Int] = None, + followerNeedsHwUpdate: Boolean = false, exception: Option[Throwable] = None) { def error: Errors = exception match { @@ -106,7 +115,8 @@ case class FetchPartitionData(error: Errors = Errors.NONE, logStartOffset: Long, records: Records, lastStableOffset: Option[Long], - abortedTransactions: Option[List[AbortedTransaction]]) + abortedTransactions: Option[List[AbortedTransaction]], + preferredReadReplica: Option[Int]) /** @@ -216,6 +226,8 @@ class ReplicaManager(val config: KafkaConfig, } } + val replicaSelectorOpt: Option[ReplicaSelector] = createReplicaSelector() + val leaderCount = newGauge( "LeaderCount", new Gauge[Int] { @@ -807,8 +819,9 @@ class ReplicaManager(val config: KafkaConfig, } /** - * Fetch messages from the leader replica, and wait until enough data can be fetched and return; - * the callback function will be triggered either when timeout or required fetch info is satisfied + * Fetch messages from a replica, and wait until enough data can be fetched and return; + * the callback function will be triggered either when timeout or required fetch info is satisfied. + * Consumers may fetch from any replica, but followers can only fetch from the leader. */ def fetchMessages(timeout: Long, replicaId: Int, @@ -818,9 +831,9 @@ class ReplicaManager(val config: KafkaConfig, fetchInfos: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota, responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, - isolationLevel: IsolationLevel) { + isolationLevel: IsolationLevel, + clientMetadata: Option[ClientMetadata]) { val isFromFollower = Request.isValidBrokerId(replicaId) - val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId && replicaId != Request.FutureLocalReplicaId val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId) FetchLogEnd @@ -829,16 +842,16 @@ class ReplicaManager(val config: KafkaConfig, else FetchHighWatermark - def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { val result = readFromLocalLog( replicaId = replicaId, - fetchOnlyFromLeader = fetchOnlyFromLeader, + fetchOnlyFromLeader = isFromFollower, fetchIsolation = fetchIsolation, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, - quota = quota) + quota = quota, + clientMetadata = clientMetadata) if (isFromFollower) updateFollowerFetchState(replicaId, result) else result } @@ -849,23 +862,37 @@ class ReplicaManager(val config: KafkaConfig, var bytesReadable: Long = 0 var errorReadingData = false val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult] + var anyPartitionsNeedHwUpdate = false logReadResults.foreach { case (topicPartition, logReadResult) => if (logReadResult.error != Errors.NONE) errorReadingData = true bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes logReadResultMap.put(topicPartition, logReadResult) + if (isFromFollower && logReadResult.followerNeedsHwUpdate) { + anyPartitionsNeedHwUpdate = true + } } + // Wrap the given callback function with another function that will update the HW for the remote follower + val updateHwAndThenCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit = + (fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]) => { + fetchPartitionData.foreach { + case (tp, partitionData) => updateFollowerHighWatermark(tp, replicaId, partitionData.highWatermark) + } + responseCallback(fetchPartitionData) + } + // respond immediately if 1) fetch request does not want to wait // 2) fetch request does not require any data // 3) has enough data to respond // 4) some error happens while reading data - if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { + // 5) all the requested partitions need HW update + if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || anyPartitionsNeedHwUpdate) { val fetchPartitionData = logReadResults.map { case (tp, result) => tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, - result.lastStableOffset, result.info.abortedTransactions) + result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica) } - responseCallback(fetchPartitionData) + updateHwAndThenCallback(fetchPartitionData) } else { // construct the fetch results from the read results val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)] @@ -875,9 +902,10 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, + val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, isFromFollower, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) - val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback) + val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata, + updateHwAndThenCallback) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } @@ -898,7 +926,8 @@ class ReplicaManager(val config: KafkaConfig, fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, readPartitionInfo: Seq[(TopicPartition, PartitionData)], - quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = { + quota: ReplicaQuota, + clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = { def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = { val offset = fetchInfo.fetchOffset @@ -917,35 +946,64 @@ class ReplicaManager(val config: KafkaConfig, val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader) val fetchTimeMs = time.milliseconds - // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition - val readInfo = partition.readRecords( - fetchOffset = fetchInfo.fetchOffset, - currentLeaderEpoch = fetchInfo.currentLeaderEpoch, - maxBytes = adjustedMaxBytes, - fetchIsolation = fetchIsolation, - fetchOnlyFromLeader = fetchOnlyFromLeader, - minOneMessage = minOneMessage) - - val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) { - // If the partition is being throttled, simply return an empty set. - FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) - } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) { - // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make - // progress in such cases and don't need to report a `RecordTooLargeException` - FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) + // If we are the leader, determine the preferred read-replica + val preferredReadReplica = clientMetadata.flatMap( + metadata => findPreferredReadReplica(tp, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs)) + + if (preferredReadReplica.isDefined) { + replicaSelectorOpt.foreach{ selector => + debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " + + s"${preferredReadReplica.get} for $clientMetadata") + } + // If a preferred read-replica is set, skip the read + val offsetSnapshot: LogOffsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, false) + LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), + highWatermark = offsetSnapshot.highWatermark.messageOffset, + leaderLogStartOffset = offsetSnapshot.logStartOffset, + leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset, + followerLogStartOffset = followerLogStartOffset, + fetchTimeMs = -1L, + readSize = 0, + lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset), + preferredReadReplica = preferredReadReplica, + exception = None) } else { - readInfo.fetchedData - } + // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition + val readInfo: LogReadInfo = partition.readRecords( + fetchOffset = fetchInfo.fetchOffset, + currentLeaderEpoch = fetchInfo.currentLeaderEpoch, + maxBytes = adjustedMaxBytes, + fetchIsolation = fetchIsolation, + fetchOnlyFromLeader = fetchOnlyFromLeader, + minOneMessage = minOneMessage) + + // Check if the HW known to the follower is behind the actual HW + val followerNeedsHwUpdate: Boolean = partition.getReplica(replicaId) + .exists(replica => replica.lastSentHighWatermark < readInfo.highWatermark) + + val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) { + // If the partition is being throttled, simply return an empty set. + FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) + } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) { + // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make + // progress in such cases and don't need to report a `RecordTooLargeException` + FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) + } else { + readInfo.fetchedData + } - LogReadResult(info = fetchDataInfo, - highWatermark = readInfo.highWatermark, - leaderLogStartOffset = readInfo.logStartOffset, - leaderLogEndOffset = readInfo.logEndOffset, - followerLogStartOffset = followerLogStartOffset, - fetchTimeMs = fetchTimeMs, - readSize = adjustedMaxBytes, - lastStableOffset = Some(readInfo.lastStableOffset), - exception = None) + LogReadResult(info = fetchDataInfo, + highWatermark = readInfo.highWatermark, + leaderLogStartOffset = readInfo.logStartOffset, + leaderLogEndOffset = readInfo.logEndOffset, + followerLogStartOffset = followerLogStartOffset, + fetchTimeMs = fetchTimeMs, + readSize = adjustedMaxBytes, + lastStableOffset = Some(readInfo.lastStableOffset), + preferredReadReplica = preferredReadReplica, + followerNeedsHwUpdate = followerNeedsHwUpdate, + exception = None) + } } catch { // NOTE: Failed fetch requests metric is not incremented for known exceptions since it // is supposed to indicate un-expected failure of a broker in handling a fetch request @@ -1000,6 +1058,59 @@ class ReplicaManager(val config: KafkaConfig, result } + /** + * Using the configured [[ReplicaSelector]], determine the preferred read replica for a partition given the + * client metadata, the requested offset, and the current set of replicas. If the preferred read replica is the + * leader, return None + */ + def findPreferredReadReplica(tp: TopicPartition, + clientMetadata: ClientMetadata, + replicaId: Int, + fetchOffset: Long, + currentTimeMs: Long): Option[Int] = { + val partition = getPartitionOrException(tp, expectLeader = false) + + if (partition.isLeader) { + if (Request.isValidBrokerId(replicaId)) { + // Don't look up preferred for follower fetches via normal replication + Option.empty + } else { + replicaSelectorOpt.flatMap { replicaSelector => + val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(tp, new ListenerName(clientMetadata.listenerName)) + var replicaInfoSet: Set[ReplicaView] = partition.remoteReplicas + // Exclude replicas that don't have the requested offset (whether or not if they're in the ISR) + .filter(replica => replica.logEndOffset >= fetchOffset) + .filter(replica => replica.logStartOffset <= fetchOffset) + .map(replica => new DefaultReplicaView( + replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()), + replica.logEndOffset, + currentTimeMs - replica.lastCaughtUpTimeMs + )) + + if (partition.leaderReplicaIdOpt.isDefined) { + val leaderReplica: ReplicaView = partition.leaderReplicaIdOpt + .map(replicaId => replicaEndpoints.getOrElse(replicaId, Node.noNode())) + .map(leaderNode => new DefaultReplicaView(leaderNode, partition.localLogOrException.logEndOffset, 0L)) + .get + replicaInfoSet ++= Set(leaderReplica) + + val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica) + replicaSelector.select(tp, clientMetadata, partitionInfo).asScala + .filter(!_.endpoint.isEmpty) + // Even though the replica selector can return the leader, we don't want to send it out with the + // FetchResponse, so we exclude it here + .filter(!_.equals(leaderReplica)) + .map(_.endpoint.id) + } else { + None + } + } + } + } else { + None + } + } + /** * To avoid ISR thrashing, we only throttle a replica on the leader if it's in the throttled replica list, * the quota is exceeded and the replica is not in sync. @@ -1424,6 +1535,15 @@ class ReplicaManager(val config: KafkaConfig, } } + private def updateFollowerHighWatermark(topicPartition: TopicPartition, followerId: Int, highWatermark: Long): Unit = { + nonOfflinePartition(topicPartition).flatMap(_.getReplica(followerId)) match { + case Some(replica) => replica.updateLastSentHighWatermark(highWatermark) + case None => + warn(s"While updating the HW for follower $followerId for partition $topicPartition, " + + s"the replica could not be found.") + } + } + private def leaderPartitionsIterator: Iterator[Partition] = nonOfflinePartitionsIterator.filter(_.leaderLogIfLocal.isDefined) @@ -1516,6 +1636,7 @@ class ReplicaManager(val config: KafkaConfig, delayedElectLeaderPurgatory.shutdown() if (checkpointHW) checkpointHighWatermarks() + replicaSelectorOpt.foreach(_.close) info("Shut down completely") } @@ -1527,6 +1648,14 @@ class ReplicaManager(val config: KafkaConfig, new ReplicaAlterLogDirsManager(config, this, quotaManager, brokerTopicStats) } + protected def createReplicaSelector(): Option[ReplicaSelector] = { + config.replicaSelectorClassName.map { className => + val tmpReplicaSelector: ReplicaSelector = CoreUtils.createObject[ReplicaSelector](className) + tmpReplicaSelector.configure(config.originals()) + tmpReplicaSelector + } + } + def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, EpochEndOffset] = { requestedEpochInfo.map { case (tp, partitionData) => val epochEndOffset = getPartition(tp) match { diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index 2d8fac1a70ad..d41bc5b2e564 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -20,7 +20,8 @@ import java.util.Optional import scala.collection.Seq -import kafka.cluster.Partition +import kafka.cluster.{Partition, Replica} +import kafka.log.LogOffsetSnapshot import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.FencedLeaderEpochException import org.apache.kafka.common.protocol.Errors @@ -58,6 +59,7 @@ class DelayedFetchTest extends EasyMockSupport { fetchMetadata = fetchMetadata, replicaManager = replicaManager, quota = replicaQuota, + clientMetadata = None, responseCallback = callback) val partition: Partition = mock(classOf[Partition]) @@ -79,6 +81,89 @@ class DelayedFetchTest extends EasyMockSupport { assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.error) } + def checkCompleteWhenFollowerLaggingHW(followerHW: Option[Long], checkResult: DelayedFetch => Unit): Unit = { + val topicPartition = new TopicPartition("topic", 0) + val fetchOffset = 500L + val logStartOffset = 0L + val currentLeaderEpoch = Optional.of[Integer](10) + val replicaId = 1 + + val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus) + + var fetchResultOpt: Option[FetchPartitionData] = None + def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) + } + + val delayedFetch = new DelayedFetch( + delayMs = 500, + fetchMetadata = fetchMetadata, + replicaManager = replicaManager, + quota = replicaQuota, + clientMetadata = None, + responseCallback = callback + ) + + val partition: Partition = mock(classOf[Partition]) + + EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true)) + .andReturn(partition) + EasyMock.expect(partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true)) + .andReturn( + LogOffsetSnapshot( + logStartOffset = 0, + logEndOffset = new LogOffsetMetadata(500L), + highWatermark = new LogOffsetMetadata(480L), + lastStableOffset = new LogOffsetMetadata(400L))) + + expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo) + + val follower = new Replica(replicaId, topicPartition) + followerHW.foreach(hw => { + follower.updateFetchState(LogOffsetMetadata.UnknownOffsetMetadata, 0L, 0L, 0L) + follower.updateLastSentHighWatermark(hw) + }) + EasyMock.expect(partition.getReplica(replicaId)) + .andReturn(Some(follower)) + + replayAll() + checkResult.apply(delayedFetch) + } + + @Test + def testCompleteWhenFollowerLaggingHW(): Unit = { + // No HW from the follower, should complete + resetAll + checkCompleteWhenFollowerLaggingHW(None, delayedFetch => { + assertTrue(delayedFetch.tryComplete()) + assertTrue(delayedFetch.isCompleted) + }) + + // A higher HW from the follower (shouldn't actually be possible) + resetAll + checkCompleteWhenFollowerLaggingHW(Some(500), delayedFetch => { + assertFalse(delayedFetch.tryComplete()) + assertFalse(delayedFetch.isCompleted) + }) + + // An equal HW from follower + resetAll + checkCompleteWhenFollowerLaggingHW(Some(480), delayedFetch => { + assertFalse(delayedFetch.tryComplete()) + assertFalse(delayedFetch.isCompleted) + }) + + // A lower HW from follower, should complete the fetch + resetAll + checkCompleteWhenFollowerLaggingHW(Some(470), delayedFetch => { + assertTrue(delayedFetch.tryComplete()) + assertTrue(delayedFetch.isCompleted) + }) + } + private def buildFetchMetadata(replicaId: Int, topicPartition: TopicPartition, fetchStatus: FetchPartitionStatus): FetchMetadata = { @@ -103,10 +188,38 @@ class DelayedFetchTest extends EasyMockSupport { fetchMaxBytes = maxBytes, hardMaxBytesLimit = false, readPartitionInfo = Seq((topicPartition, fetchPartitionData)), + clientMetadata = None, quota = replicaQuota)) .andReturn(Seq((topicPartition, buildReadResultWithError(error)))) } + private def expectReadFromReplica(replicaId: Int, + topicPartition: TopicPartition, + fetchPartitionData: FetchRequest.PartitionData): Unit = { + val result = LogReadResult( + exception = None, + info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), + highWatermark = -1L, + leaderLogStartOffset = -1L, + leaderLogEndOffset = -1L, + followerLogStartOffset = -1L, + fetchTimeMs = -1L, + readSize = -1, + lastStableOffset = None) + + + EasyMock.expect(replicaManager.readFromLocalLog( + replicaId = replicaId, + fetchOnlyFromLeader = true, + fetchIsolation = FetchLogEnd, + fetchMaxBytes = maxBytes, + hardMaxBytesLimit = false, + readPartitionInfo = Seq((topicPartition, fetchPartitionData)), + clientMetadata = None, + quota = replicaQuota)) + .andReturn(Seq((topicPartition, result))).anyTimes() + } + private def buildReadResultWithError(error: Errors): LogReadResult = { LogReadResult( exception = Some(error.exception), diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 4f2d31991bea..33c0d951f3d0 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -242,9 +242,11 @@ class PartitionTest { val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) def assertSnapshotError(expectedError: Errors, currentLeaderEpoch: Optional[Integer]): Unit = { - partition.fetchOffsetSnapshotOrError(currentLeaderEpoch, fetchOnlyFromLeader = true) match { - case Left(_) => assertEquals(Errors.NONE, expectedError) - case Right(error) => assertEquals(expectedError, error) + try { + partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true) + assertEquals(Errors.NONE, expectedError) + } catch { + case error: ApiException => assertEquals(expectedError, Errors.forException(error)) } } @@ -262,9 +264,11 @@ class PartitionTest { def assertSnapshotError(expectedError: Errors, currentLeaderEpoch: Optional[Integer], fetchOnlyLeader: Boolean): Unit = { - partition.fetchOffsetSnapshotOrError(currentLeaderEpoch, fetchOnlyFromLeader = fetchOnlyLeader) match { - case Left(_) => assertEquals(expectedError, Errors.NONE) - case Right(error) => assertEquals(expectedError, error) + try { + partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = fetchOnlyLeader) + assertEquals(Errors.NONE, expectedError) + } catch { + case error: ApiException => assertEquals(expectedError, Errors.forException(error)) } } @@ -1039,6 +1043,7 @@ class PartitionTest { assertEquals(time.milliseconds(), remoteReplica.lastCaughtUpTimeMs) assertEquals(6L, remoteReplica.logEndOffset) assertEquals(0L, remoteReplica.logStartOffset) + } @Test diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index a796d6f2efa3..c584fc090a96 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -3693,6 +3693,37 @@ class LogTest { assertEquals(None, reopenedLog.firstUnstableOffset.map(_.messageOffset)) } + @Test + def testOffsetSnapshot() { + val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) + + // append a few records + appendAsFollower(log, MemoryRecords.withRecords(CompressionType.NONE, + new SimpleRecord("a".getBytes), + new SimpleRecord("b".getBytes), + new SimpleRecord("c".getBytes)), 5) + + + log.highWatermark = 2L + var offsets: LogOffsetSnapshot = log.offsetSnapshot + assertEquals(offsets.highWatermark.messageOffset, 2L) + assertFalse(offsets.highWatermark.messageOffsetOnly) + + offsets = log.offsetSnapshot + assertEquals(offsets.highWatermark.messageOffset, 2L) + assertFalse(offsets.highWatermark.messageOffsetOnly) + + try { + log.highWatermark = 100L + offsets = log.offsetSnapshot + fail("Should have thrown") + } catch { + case e: OffsetOutOfRangeException => // pass + case _ => fail("Should have seen OffsetOutOfRangeException") + } + } + @Test def testLastStableOffsetWithMixedProducerData() { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 7d3a58bccfb9..0363963de7f5 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -205,7 +205,7 @@ class FetchRequestTest extends BaseRequestTest { Seq(topicPartition))).build() val fetchResponse = sendFetchRequest(nonReplicaId, fetchRequest) val partitionData = fetchResponse.responseData.get(topicPartition) - assertEquals(Errors.NOT_LEADER_FOR_PARTITION, partitionData.error) + assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionData.error) } @Test @@ -238,8 +238,8 @@ class FetchRequestTest extends BaseRequestTest { // Check follower error codes val followerId = TestUtils.findFollowerId(topicPartition, servers) - assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.empty()) - assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.of(secondLeaderEpoch)) + assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.empty()) + assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.of(secondLeaderEpoch)) assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch + 1)) assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1)) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 41e7c38af71d..c26c3fd21d35 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -50,6 +50,7 @@ import org.easymock.{Capture, EasyMock, IAnswer} import EasyMock._ import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData} import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol +import org.apache.kafka.common.replica.ClientMetadata import org.junit.Assert.{assertEquals, assertNull, assertTrue} import org.junit.{After, Test} @@ -464,14 +465,15 @@ class KafkaApisTest { replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean, anyObject[Seq[(TopicPartition, FetchRequest.PartitionData)]], anyObject[ReplicaQuota], - anyObject[Seq[(TopicPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel]) + anyObject[Seq[(TopicPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel], + anyObject[Option[ClientMetadata]]) expectLastCall[Unit].andAnswer(new IAnswer[Unit] { def answer: Unit = { val callback = getCurrentArguments.apply(7).asInstanceOf[(Seq[(TopicPartition, FetchPartitionData)] => Unit)] val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8))) callback(Seq(tp -> new FetchPartitionData(Errors.NONE, hw, 0, records, - None, None))) + None, None, Option.empty))) } }) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 9d28c1b756ad..850f553a6fc8 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -655,6 +655,7 @@ class KafkaConfigTest { case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaFetchResponseMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.ReplicaSelectorClassProp => // Ignore string case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index 9005ec3e1968..79457c0ee825 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -395,6 +395,7 @@ class ReplicaAlterLogDirsThreadTest { EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.capture(responseCallback), + EasyMock.anyObject(), EasyMock.anyObject())) .andAnswer(new IAnswer[Unit] { override def answer(): Unit = { @@ -629,6 +630,7 @@ class ReplicaAlterLogDirsThreadTest { EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.capture(responseCallback), + EasyMock.anyObject(), EasyMock.anyObject())) .andAnswer(new IAnswer[Unit] { override def answer(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 48c23e494446..fddceff18246 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -64,7 +64,8 @@ class ReplicaManagerQuotasTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = quota) + quota = quota, + clientMetadata = None) assertEquals("Given two partitions, with only one throttled, we should get the first", 1, fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size) @@ -89,7 +90,8 @@ class ReplicaManagerQuotasTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = quota) + quota = quota, + clientMetadata = None) assertEquals("Given two partitions, with both throttled, we should get no messages", 0, fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size) assertEquals("Given two partitions, with both throttled, we should get no messages", 0, @@ -113,7 +115,8 @@ class ReplicaManagerQuotasTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = quota) + quota = quota, + clientMetadata = None) assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1, fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size) assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1, @@ -137,7 +140,8 @@ class ReplicaManagerQuotasTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = quota) + quota = quota, + clientMetadata = None) assertEquals("Given two partitions, with only one throttled, we should get the first", 1, fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size) @@ -167,6 +171,7 @@ class ReplicaManagerQuotasTest { EasyMock.expect(replicaManager.shouldLeaderThrottle(EasyMock.anyObject[ReplicaQuota], EasyMock.anyObject[TopicPartition], EasyMock.anyObject[Int])) .andReturn(!isReplicaInSync).anyTimes() + EasyMock.expect(partition.getReplica(1)).andReturn(None) EasyMock.replay(replicaManager, partition) val tp = new TopicPartition("t1", 0) @@ -179,9 +184,10 @@ class ReplicaManagerQuotasTest { fetchIsolation = FetchLogEnd, isFromFollower = true, replicaId = 1, - fetchPartitionStatus = List((tp, fetchPartitionStatus))) + fetchPartitionStatus = List((tp, fetchPartitionStatus)) + ) new DelayedFetch(delayMs = 600, fetchMetadata = fetchMetadata, replicaManager = replicaManager, - quota = null, responseCallback = null) { + quota = null, clientMetadata = None, responseCallback = null) { override def forceComplete(): Boolean = true } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 87f2894c29e3..609150340bff 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -18,10 +18,13 @@ package kafka.server import java.io.File -import java.util.{Optional, Properties} -import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.net.InetAddress import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.{Optional, Properties} +import kafka.api.Request +import kafka.cluster.BrokerEndPoint import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager} import kafka.utils.{MockScheduler, MockTime, TestUtils} import TestUtils.createBroker @@ -29,16 +32,22 @@ import kafka.cluster.BrokerEndPoint import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.checkpoints.LazyOffsetCheckpoints import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend +import kafka.utils.TestUtils.createBroker import kafka.utils.timer.MockTimer +import kafka.utils.{MockScheduler, MockTime, TestUtils} import kafka.zk.KafkaZkClient import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, IsolationLevel, LeaderAndIsrRequest} -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata +import org.apache.kafka.common.replica.ClientMetadata +import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, LeaderAndIsrRequest} +import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{Node, TopicPartition} import org.apache.zookeeper.data.Stat @@ -179,12 +188,6 @@ class ReplicaManagerTest { assertEquals(Errors.NOT_LEADER_FOR_PARTITION, response.error) } - // Fetch some messages - val fetchResult = fetchAsConsumer(rm, new TopicPartition(topic, 0), - new PartitionData(0, 0, 100000, Optional.empty()), - minBytes = 100000) - assertFalse(fetchResult.isFired) - // Make this replica the follower val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, collection.immutable.Map(new TopicPartition(topic, 0) -> @@ -193,7 +196,6 @@ class ReplicaManagerTest { rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) assertTrue(appendResult.isFired) - assertTrue(fetchResult.isFired) } finally { rm.shutdown(checkpointHW = false) } @@ -515,7 +517,8 @@ class ReplicaManagerTest { fetchInfos = Seq(tp -> validFetchPartitionData), quota = UnboundedQuota, isolationLevel = IsolationLevel.READ_UNCOMMITTED, - responseCallback = callback + responseCallback = callback, + clientMetadata = None ) assertTrue(successfulFetch.isDefined) @@ -537,7 +540,8 @@ class ReplicaManagerTest { fetchInfos = Seq(tp -> invalidFetchPartitionData), quota = UnboundedQuota, isolationLevel = IsolationLevel.READ_UNCOMMITTED, - responseCallback = callback + responseCallback = callback, + clientMetadata = None ) assertTrue(successfulFetch.isDefined) @@ -617,7 +621,8 @@ class ReplicaManagerTest { tp1 -> new PartitionData(1, 0, 100000, Optional.empty())), quota = UnboundedQuota, responseCallback = fetchCallback, - isolationLevel = IsolationLevel.READ_UNCOMMITTED + isolationLevel = IsolationLevel.READ_UNCOMMITTED, + clientMetadata = None ) val tp0Log = replicaManager.localLog(tp0) assertTrue(tp0Log.isDefined) @@ -678,6 +683,157 @@ class ReplicaManagerTest { EasyMock.verify(mockLogMgr) } + @Test + def testReplicaSelector(): Unit = { + val topicPartition = 0 + val followerBrokerId = 0 + val leaderBrokerId = 1 + val controllerId = 0 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId) + val countDownLatch = new CountDownLatch(1) + + // Prepare the mocked components for the test + val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager( + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true) + + val partition = replicaManager.createPartition(new TopicPartition(topic, topicPartition)) + + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + partition.createLogIfNotExists(leaderBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) + partition.makeLeader( + controllerId, + leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds), + correlationId, + offsetCheckpoints + ) + + val tp0 = new TopicPartition(topic, 0) + + val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", + InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") + + // We expect to select the leader, which means we return None + val preferredReadReplica: Option[Int] = replicaManager.findPreferredReadReplica( + tp0, metadata, Request.OrdinaryConsumerId, 1L, System.currentTimeMillis) + assertFalse(preferredReadReplica.isDefined) + } + + @Test + def testPreferredReplicaAsFollower(): Unit = { + val topicPartition = 0 + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + + // Prepare the mocked components for the test + val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager( + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true) + + val brokerList = Seq[Integer](0, 1).asJava + + val tp0 = new TopicPartition(topic, 0) + + replicaManager.createPartition(new TopicPartition(topic, 0)) + + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + collection.immutable.Map(new TopicPartition(topic, 0) -> + new LeaderAndIsrRequest.PartitionState(0, 1, 1, brokerList, 0, brokerList, false)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + + val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", + InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") + + val consumerResult = fetchAsConsumer(replicaManager, tp0, + new PartitionData(0, 0, 100000, Optional.empty()), + clientMetadata = Some(metadata)) + + // Fetch from follower succeeds + assertTrue(consumerResult.isFired) + + // But only leader will compute preferred replica + assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty) + } + + @Test + def testPreferredReplicaAsLeader(): Unit = { + val topicPartition = 0 + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + + // Prepare the mocked components for the test + val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager( + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true) + + val brokerList = Seq[Integer](0, 1).asJava + + val tp0 = new TopicPartition(topic, 0) + + replicaManager.createPartition(new TopicPartition(topic, 0)) + + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + collection.immutable.Map(new TopicPartition(topic, 0) -> + new LeaderAndIsrRequest.PartitionState(0, 0, 1, brokerList, 0, brokerList, false)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + + val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", + InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") + + val consumerResult = fetchAsConsumer(replicaManager, tp0, + new PartitionData(0, 0, 100000, Optional.empty()), + clientMetadata = Some(metadata)) + + // Fetch from follower succeeds + assertTrue(consumerResult.isFired) + + // Returns a preferred replica (should just be the leader, which is None) + assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined) + } + + @Test(expected = classOf[ClassNotFoundException]) + def testUnknownReplicaSelector(): Unit = { + val topicPartition = 0 + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + + val props = new Properties() + props.put(KafkaConfig.ReplicaSelectorClassProp, "non-a-class") + val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager( + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true, extraProps = props) + } + + @Test + def testDefaultReplicaSelector(): Unit = { + val topicPartition = 0 + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + + val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager( + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, + leaderBrokerId, countDownLatch, expectTruncation = true) + assertFalse(replicaManager.replicaSelectorOpt.isDefined) + } + /** * This method assumes that the test using created ReplicaManager calls * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing @@ -688,9 +844,11 @@ class ReplicaManagerTest { followerBrokerId: Int, leaderBrokerId: Int, countDownLatch: CountDownLatch, - expectTruncation: Boolean) : (ReplicaManager, LogManager) = { + expectTruncation: Boolean, + extraProps: Properties = new Properties()) : (ReplicaManager, LogManager) = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) + props.asScala ++= extraProps.asScala val config = KafkaConfig.fromProps(props) // Setup mock local log to have leader epoch of 3 and offset of 10 @@ -749,9 +907,16 @@ class ReplicaManagerTest { .andReturn(Option(createBroker(brokerId, s"host$brokerId", brokerId))) .anyTimes } + EasyMock + .expect(metadataCache.getPartitionReplicaEndpoints( + EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(Map( + leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"), + followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")).toMap + ) + .anyTimes() EasyMock.replay(metadataCache) - val timer = new MockTimer val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( purgatoryName = "Produce", timer, reaperEnabled = false) @@ -860,16 +1025,18 @@ class ReplicaManagerTest { partition: TopicPartition, partitionData: PartitionData, minBytes: Int = 0, - isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): CallbackResult[FetchPartitionData] = { - fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel) + isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED, + clientMetadata: Option[ClientMetadata] = None): CallbackResult[FetchPartitionData] = { + fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel, clientMetadata) } private def fetchAsFollower(replicaManager: ReplicaManager, partition: TopicPartition, partitionData: PartitionData, minBytes: Int = 0, - isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): CallbackResult[FetchPartitionData] = { - fetchMessages(replicaManager, replicaId = 1, partition, partitionData, minBytes, isolationLevel) + isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED, + clientMetadata: Option[ClientMetadata] = None): CallbackResult[FetchPartitionData] = { + fetchMessages(replicaManager, replicaId = 1, partition, partitionData, minBytes, isolationLevel, clientMetadata) } private def fetchMessages(replicaManager: ReplicaManager, @@ -877,7 +1044,8 @@ class ReplicaManagerTest { partition: TopicPartition, partitionData: PartitionData, minBytes: Int, - isolationLevel: IsolationLevel): CallbackResult[FetchPartitionData] = { + isolationLevel: IsolationLevel, + clientMetadata: Option[ClientMetadata]): CallbackResult[FetchPartitionData] = { val result = new CallbackResult[FetchPartitionData]() def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = { assertEquals(1, responseStatus.size) @@ -895,7 +1063,9 @@ class ReplicaManagerTest { fetchInfos = Seq(partition -> partitionData), quota = UnboundedQuota, responseCallback = fetchCallback, - isolationLevel = isolationLevel) + isolationLevel = isolationLevel, + clientMetadata = clientMetadata + ) result } diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index bae1171fa1c7..bf5c20eabb68 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -178,7 +178,8 @@ class SimpleFetchTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = UnboundedQuota).find(_._1 == topicPartition) + quota = UnboundedQuota, + clientMetadata = None).find(_._1 == topicPartition) val firstReadRecord = readCommittedRecords.get._2.info.records.records.iterator.next() assertEquals("Reading committed data should return messages only up to high watermark", recordToHW, new SimpleRecord(firstReadRecord)) @@ -190,7 +191,8 @@ class SimpleFetchTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, readPartitionInfo = fetchInfo, - quota = UnboundedQuota).find(_._1 == topicPartition) + quota = UnboundedQuota, + clientMetadata = None).find(_._1 == topicPartition) val firstRecord = readAllRecords.get._2.info.records.records.iterator.next() assertEquals("Reading any data can return messages up to the end of the log", recordToLEO, diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index d85c9dcf4474..3aeed906b1d8 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -86,6 +86,7 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro jaas_override_variables A dict of variables to be used in the jaas.conf template file kafka_opts_override Override parameters of the KAFKA_OPTS environment variable client_prop_file_override Override client.properties file used by the consumer + consumer_properties A dict of values to pass in as --consumer-property key=value """ JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), root=ConsoleConsumer.PERSISTENT_ROOT) @@ -208,7 +209,7 @@ def start_cmd(self, node): if self.consumer_properties is not None: for k, v in self.consumer_properties.items(): - cmd += "--consumer_properties %s=%s" % (k, v) + cmd += " --consumer-property %s=%s" % (k, v) cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args return cmd diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index bc99d96da09e..eb0ddfdde6d0 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -94,7 +94,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None, jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None, - listener_security_config=ListenerSecurityConfig()): + listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides={}): """ :param context: test context :param ZookeeperService zk: @@ -129,6 +129,10 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI self.server_prop_overides = [] else: self.server_prop_overides = server_prop_overides + if per_node_server_prop_overrides is None: + self.per_node_server_prop_overrides = {} + else: + self.per_node_server_prop_overrides = per_node_server_prop_overrides self.log_level = "DEBUG" self.zk_chroot = zk_chroot self.listener_security_config = listener_security_config @@ -295,6 +299,9 @@ def prop_file(self, node): for prop in self.server_prop_overides: override_configs[prop[0]] = prop[1] + for prop in self.per_node_server_prop_overrides.get(self.idx(node), []): + override_configs[prop[0]] = prop[1] + #update template configs with test override configs configs.update(override_configs) diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index cf8cbc3b6c83..c5b747da0824 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -27,9 +27,10 @@ class JmxMixin(object): - we assume the service using JmxMixin also uses KafkaPathResolverMixin - this uses the --wait option for JmxTool, so the list of object names must be explicit; no patterns are permitted """ - def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, root="/mnt"): + def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, jmx_poll_ms=1000, root="/mnt"): self.jmx_object_names = jmx_object_names self.jmx_attributes = jmx_attributes or [] + self.jmx_poll_ms = jmx_poll_ms self.jmx_port = 9192 self.started = [False] * num_nodes @@ -71,7 +72,7 @@ def check_jmx_port_listening(): if use_jmxtool_version <= V_0_11_0_0: use_jmxtool_version = DEV_BRANCH cmd = "%s %s " % (self.path.script("kafka-run-class.sh", use_jmxtool_version), self.jmx_class_name()) - cmd += "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port + cmd += "--reporting-interval %d --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % (self.jmx_poll_ms, self.jmx_port) cmd += " --wait" for jmx_object_name in self.jmx_object_names: cmd += " --object-name %s" % jmx_object_name @@ -83,7 +84,7 @@ def check_jmx_port_listening(): self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd)) node.account.ssh(cmd, allow_fail=False) - wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) + wait_until(lambda: self._jmx_has_output(node), timeout_sec=30, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) self.started[idx-1] = True def _jmx_has_output(self, node): diff --git a/tests/kafkatest/tests/client/truncation_test.py b/tests/kafkatest/tests/client/truncation_test.py index 8269de7f22e5..523bcbca985d 100644 --- a/tests/kafkatest/tests/client/truncation_test.py +++ b/tests/kafkatest/tests/client/truncation_test.py @@ -21,7 +21,6 @@ from kafkatest.services.verifiable_consumer import VerifiableConsumer - class TruncationTest(VerifiableConsumerTest): TOPIC = "test_topic" NUM_PARTITIONS = 1