From b224efa4e126094a8c75645834ca1bccc9f78c4a Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 21 Sep 2016 10:40:37 -0700 Subject: [PATCH 01/16] KAFKA-4148 follow up patch for KIP-79 --- .../kafka/clients/consumer/KafkaConsumer.java | 18 ++++++---- .../clients/consumer/internals/Fetcher.java | 11 ++++--- .../kafka/common/protocol/Protocol.java | 2 +- .../common/record/OffsetAndTimestamp.java | 4 +-- .../common/requests/ListOffsetRequest.java | 2 +- .../org/apache/kafka/clients/MockClient.java | 21 ++++++++++++ .../consumer/internals/FetcherTest.java | 30 +++++++++++++---- core/src/main/scala/kafka/log/Log.scala | 5 +-- .../src/main/scala/kafka/log/LogSegment.scala | 2 +- .../main/scala/kafka/server/KafkaApis.scala | 33 +++++++++---------- .../kafka/server/ReplicaFetcherThread.scala | 2 +- docs/upgrade.html | 4 +++ 12 files changed, 89 insertions(+), 45 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 889aad86c469..0d8462f350a2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1427,30 +1427,36 @@ public Map offsetsForTimes(Map * Notice that this method may block indefinitely if the partition does not exist. + * This method does not change the current consumer position of the partitions. + * + * @see #seekToBeginning(Collection) * * @param partitions the partitions to get the earliest offsets. * @return The earliest available offsets for the given partitions */ @Override public Map beginningOffsets(Collection partitions) { - return fetcher.earliestOffsets(partitions); + return fetcher.beginningOffsets(partitions); } /** - * Get the end offsets for the given partitions. The end offset of a partition is the offset of the upcoming + * Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming * message, i.e. the offset of the last available message + 1. - * + *

* Notice that this method may block indefinitely if the partition does not exist. + * This method does not change the current consumer position of the partitions. + * + * @see #seekToEnd(Collection) * * @param partitions the partitions to get the end offsets. * @return The end offsets for the given partitions. */ @Override public Map endOffsets(Collection partitions) { - return fetcher.latestOffsets(partitions); + return fetcher.endOffsets(partitions); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 14f7c5dc26e7..503caa806565 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -378,15 +378,15 @@ public Map getOffsetsByTimes(Map earliestOffsets(Collection partitions) { - return earliestOrLatestOffset(partitions, ListOffsetRequest.EARLIEST_TIMESTAMP); + public Map beginningOffsets(Collection partitions) { + return beginningOrEndOffset(partitions, ListOffsetRequest.EARLIEST_TIMESTAMP); } - public Map latestOffsets(Collection partitions) { - return earliestOrLatestOffset(partitions, ListOffsetRequest.LATEST_TIMESTAMP); + public Map endOffsets(Collection partitions) { + return beginningOrEndOffset(partitions, ListOffsetRequest.LATEST_TIMESTAMP); } - private Map earliestOrLatestOffset(Collection partitions, long timestamp) { + private Map beginningOrEndOffset(Collection partitions, long timestamp) { Map timestampsToSearch = new HashMap<>(); for (TopicPartition tp : partitions) timestampsToSearch.put(tp, timestamp); @@ -557,6 +557,7 @@ private void handleListOffsetResponse(Map timestampsToSear ClientResponse clientResponse, RequestFuture> future) { ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody()); + log.trace("Received ListOffsetResponse {}", lor); Map timestampOffsetMap = new HashMap<>(); for (Map.Entry entry : timestampsToSearch.entrySet()) { TopicPartition topicPartition = entry.getKey(); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 5abf125d7a8a..9e21f3be516d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -425,7 +425,7 @@ public class Protocol { "The timestamp associated with the returned offset"), new Field("offset", INT64, - "offsets found")); + "offset found")); public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), new Field("partition_responses", diff --git a/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java b/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java index 562585bd2eff..7ddd3ecebca7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java +++ b/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.common.record; -import java.util.Objects; - /** * A container class for offset and timestamp. */ @@ -45,7 +43,7 @@ public String toString() { @Override public int hashCode() { - return Objects.hash(timestamp, offset); + return 31 * Long.valueOf(timestamp).hashCode() + Long.valueOf(offset).hashCode(); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 7e586a43d9c9..651077cd6e87 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -92,7 +92,7 @@ public ListOffsetRequest(int replicaId, Map offse /** * Constructor for ListOffsetRequest v1. */ - public ListOffsetRequest(Map targetTimes, int replicaId) { + public ListOffsetRequest(Map targetTimes, int replicaId) { this(replicaId, targetTimes, 1); } diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 8881f829b3ce..65ea825f3a68 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -26,6 +26,7 @@ import java.util.Queue; import java.util.Set; +import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Struct; @@ -59,6 +60,7 @@ public FutureResponse(Struct responseBody, boolean disconnected, RequestMatcher } private final Time time; + private final Metadata metadata; private int correlation = 0; private Node node = null; private final Set ready = new HashSet<>(); @@ -66,9 +68,16 @@ public FutureResponse(Struct responseBody, boolean disconnected, RequestMatcher private final Queue requests = new ArrayDeque<>(); private final Queue responses = new ArrayDeque<>(); private final Queue futureResponses = new ArrayDeque<>(); + private final Queue metadataUpdates = new ArrayDeque<>(); public MockClient(Time time) { this.time = time; + this.metadata = null; + } + + public MockClient(Time time, Metadata metadata) { + this.time = time; + this.metadata = metadata; } @Override @@ -149,6 +158,14 @@ public void send(ClientRequest request, long now) { public List poll(long timeoutMs, long now) { List copy = new ArrayList<>(this.responses); + if (metadata != null && metadata.updateRequested()) { + Cluster cluster = metadataUpdates.poll(); + if (cluster == null) + metadata.update(metadata.fetch(), time.milliseconds()); + else + metadata.update(cluster, time.milliseconds()); + } + while (!this.responses.isEmpty()) { ClientResponse response = this.responses.poll(); if (response.request().hasCallback()) @@ -233,6 +250,10 @@ public void prepareResponseFrom(RequestMatcher matcher, Struct body, Node node, futureResponses.add(new FutureResponse(body, disconnected, matcher, node)); } + public void prepareMetadataUpdate(Cluster cluster) { + metadataUpdates.add(cluster); + } + public void setNode(Node node) { this.node = node; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index d14488ca66b8..31bb299b455b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -84,8 +84,8 @@ public class FetcherTest { private int fetchSize = 1000; private long retryBackoffMs = 100; private MockTime time = new MockTime(1); - private MockClient client = new MockClient(time); private Metadata metadata = new Metadata(0, Long.MAX_VALUE); + private MockClient client = new MockClient(time, metadata); private Cluster cluster = TestUtils.singletonCluster(topicName, 1); private Node node = cluster.nodes().get(0); private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); @@ -638,13 +638,27 @@ public void testQuotaMetrics() throws Exception { @Test public void testGetOffsetsByTimes() { - client.prepareResponseFrom(listOffsetResponse(Errors.NONE, 100L, 100L), cluster.leaderFor(tp)); + TopicPartition tp0 = tp; + TopicPartition tp1 = new TopicPartition(topicName, 1); + // Ensure metadata has both partition. + Cluster cluster = TestUtils.clusterWith(2, topicName, 2); + metadata.update(cluster, time.milliseconds()); - Map offsetAndTimestampMap = - fetcher.getOffsetsByTimes(Collections.singletonMap(tp, 0L)); - assertEquals(offsetAndTimestampMap.get(tp).timestamp(), 100L); - assertEquals(offsetAndTimestampMap.get(tp).offset(), 100L); + // First try should fail due to metadata error. + client.prepareResponseFrom(listOffsetResponse(Errors.NONE, 10L, 10L), cluster.leaderFor(tp0)); + client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NOT_LEADER_FOR_PARTITION, -1L, -1L), cluster.leaderFor(tp1)); + // Second try should succeed. + client.prepareResponseFrom(listOffsetResponse(Errors.NONE, 100L, 100L), cluster.leaderFor(tp0)); + client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 200L, 200L), cluster.leaderFor(tp1)); + Map timestampToSearch = new HashMap<>(); + timestampToSearch.put(tp0, 0L); + timestampToSearch.put(tp1, 0L); + Map offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch); + assertEquals(offsetAndTimestampMap.get(tp0).timestamp(), 100L); + assertEquals(offsetAndTimestampMap.get(tp0).offset(), 100L); + assertEquals(offsetAndTimestampMap.get(tp1).timestamp(), 200L); + assertEquals(offsetAndTimestampMap.get(tp1).offset(), 200L); } private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) { @@ -659,6 +673,10 @@ public boolean matches(ClientRequest request) { } private Struct listOffsetResponse(Errors error, long timestamp, long offset) { + return listOffsetResponse(tp, error, timestamp, offset); + } + + private Struct listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) { ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), timestamp, offset); Map allPartitionData = new HashMap<>(); allPartitionData.put(tp, partitionData); diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 6043b01ec209..7ab5e6ab2588 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -615,10 +615,7 @@ class Log(val dir: File, None } - targetSeg match { - case Some(segment) => segment.findOffsetByTimestamp(targetTimestamp) - case None => None - } + targetSeg.map(_.findOffsetByTimestamp(targetTimestamp).get) } /** diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 1c39acf944ff..c63a7d6a2846 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -387,7 +387,7 @@ class LogSegment(val log: FileMessageSet, * the truncated log and maybe retry or even do the search on another log segment. * * @param timestamp The timestamp to search for. - * @return the timestamp and offset of the first message whose timestamp is larger than or equals to the + * @return the timestamp and offset of the first message whose timestamp is larger than or equal to the * target timestamp. None will be returned if there is no such message. */ def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 15e5b620c35b..9e907e18aa25 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -39,7 +39,6 @@ import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors.{InvalidRequestException, ClusterAuthorizationException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol} -import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils @@ -555,10 +554,10 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => - new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava) + new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava) ) - val responseMap = authorizedRequestInfo.map({case (topicPartition, partitionData) => + val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) => try { // ensure leader exists val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) @@ -592,7 +591,7 @@ class KafkaApis(val requestChannel: RequestChannel, error("Error while responding to offset request", e) (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava)) } - }) + } responseMap ++ unauthorizedResponseStatus } @@ -606,18 +605,18 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => { - new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET) + new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET) }) val responseMap = authorizedRequestInfo.map({case (topicPartition, timestamp) => if (offsetRequest.duplicatePartitions().contains(topicPartition)) { debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + s"failed because the partition is duplicated in the request.") - (topicPartition, new PartitionData(Errors.INVALID_REQUEST.code(), - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET)) + (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST.code(), + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET)) } else { try { // ensure leader exists @@ -657,14 +656,14 @@ class KafkaApis(val requestChannel: RequestChannel, _ : InvalidRequestException) => debug(s"Offset request with correlation id $correlationId from client $clientId on " + s"partition $topicPartition failed due to ${e.getMessage}") - (topicPartition, new PartitionData(Errors.forException(e).code, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET)) + (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET)) case e: Throwable => error("Error while responding to offset request", e) - (topicPartition, new PartitionData(Errors.forException(e).code, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET)) + (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, + ListOffsetResponse.UNKNOWN_TIMESTAMP, + ListOffsetResponse.UNKNOWN_OFFSET)) } } }) @@ -687,7 +686,7 @@ class KafkaApis(val requestChannel: RequestChannel, logManager.getLog(TopicAndPartition(topicPartition.topic, topicPartition.partition)) match { case Some(log) => log.fetchOffsetsByTimestamp(timestamp) - case _ => + case None => throw new UnknownTopicOrPartitionException(s"$topicPartition does not exist on the broker.") } } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 1b0a127513f4..7930716dd73a 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -264,7 +264,7 @@ class ReplicaFetcherThread(name: String, private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long, consumerId: Int): Long = { val (request, apiVersion) = if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) { - val partitions = Map(topicPartition -> earliestOrLatest) + val partitions = Map(topicPartition -> java.lang.Long.valueOf(earliestOrLatest)) (new ListOffsetRequest(partitions.asJava, consumerId), 1) } else { val partitions = Map(topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1)) diff --git a/docs/upgrade.html b/docs/upgrade.html index 217401832163..0b446e8edde0 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -52,7 +52,11 @@

Potential breaki
  • The open file handlers of 0.10.0 will increase by ~33% because of the addition of time index files for each segment.
  • The time index and offset index share the same index size configuration. Since each time index entry is 1.5x the size of offset index entry. User may need to increase log.index.size.max.bytes to avoid potential frequent log rolling.
  • Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time.
  • + +
    Protocol changes in 0.10.1.0
    +
      +
    • ListOffsetRequest v1 is introduced and used by default to support accurate offset search based on timestamp.
    Notable changes in 0.10.1.0
    From ba4d2ac95fa7291d6e71040af848207f210e2252 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Thu, 22 Sep 2016 10:30:08 -0700 Subject: [PATCH 02/16] Addressed Ismael's comments --- .../org/apache/kafka/clients/consumer/internals/Fetcher.java | 3 ++- core/src/main/scala/kafka/log/Log.scala | 2 +- docs/upgrade.html | 5 +++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 503caa806565..72b638d14281 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -502,6 +502,7 @@ private RequestFuture> sendListOffsetReq final RequestFuture> listOffsetRequestsFuture = new RequestFuture<>(); final Map fetchedTimestampOffsets = new HashMap<>(); + final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size()); for (Map.Entry> entry : timestampsToSearchByNode.entrySet()) { sendListOffsetRequest(entry.getKey(), entry.getValue()) .addListener(new RequestFutureListener>() { @@ -509,7 +510,7 @@ private RequestFuture> sendListOffsetReq public void onSuccess(Map value) { synchronized (listOffsetRequestsFuture) { fetchedTimestampOffsets.putAll(value); - if (fetchedTimestampOffsets.size() == timestampsToSearch.size() && !listOffsetRequestsFuture.isDone()) + if (remainingResponses.decrementAndGet() == 0 && !listOffsetRequestsFuture.isDone()) listOffsetRequestsFuture.complete(fetchedTimestampOffsets); } } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 7ab5e6ab2588..188c9b51844f 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -615,7 +615,7 @@ class Log(val dir: File, None } - targetSeg.map(_.findOffsetByTimestamp(targetTimestamp).get) + targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp)) } /** diff --git a/docs/upgrade.html b/docs/upgrade.html index 0b446e8edde0..309f68a1442b 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -54,9 +54,10 @@
    Potential breaki
  • Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time.
  • -
    Protocol changes in 0.10.1.0
    +
    New protocol versions in 0.10.1.0
      -
    • ListOffsetRequest v1 is introduced and used by default to support accurate offset search based on timestamp. +
    • ListOffsetRequest v1 is introduced and used by default to support accurate offset search based on timestamp. See KIP-79 for details.
    • +
    • FetchRequest v3 is introduced to cap the maximum bytes returned by each fetch response. See KIP-74 for details.
    Notable changes in 0.10.1.0
    From 773156528f002b23c05153597ee00083f905ead6 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Thu, 22 Sep 2016 20:37:44 -0700 Subject: [PATCH 03/16] Addressed Jason's comments. --- .../clients/consumer/internals/Fetcher.java | 3 +- .../org/apache/kafka/clients/MockClient.java | 9 ++++ .../consumer/internals/FetcherTest.java | 50 +++++++++++++++---- .../main/scala/kafka/log/FileMessageSet.scala | 2 - 4 files changed, 52 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 72b638d14281..08735ceb4136 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -535,7 +535,7 @@ public void onFailure(RuntimeException e) { * @param timestampsToSearch The mapping from partitions to the target timestamps. * @return A response which can be polled to obtain the corresponding timestamps and offsets. */ - private RequestFuture> sendListOffsetRequest(Node node, + private RequestFuture> sendListOffsetRequest(final Node node, final Map timestampsToSearch) { ListOffsetRequest request = new ListOffsetRequest(timestampsToSearch, ListOffsetRequest.CONSUMER_REPLICA_ID); log.trace("Sending ListOffsetRequest {} to broker {}", request, node); @@ -543,6 +543,7 @@ private RequestFuture> sendListOffsetReq .compose(new RequestFutureAdapter>() { @Override public void onSuccess(ClientResponse response, RequestFuture> future) { + log.trace("ListOffsetResponse received from node {}", node); handleListOffsetResponse(timestampsToSearch, response, future); } }); diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 65ea825f3a68..0af4d34b9ff0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -250,6 +250,15 @@ public void prepareResponseFrom(RequestMatcher matcher, Struct body, Node node, futureResponses.add(new FutureResponse(body, disconnected, matcher, node)); } + public void reset() { + ready.clear(); + blackedOut.clear(); + requests.clear(); + responses.clear(); + futureResponses.clear(); + metadataUpdates.clear(); + } + public void prepareMetadataUpdate(Cluster cluster) { metadataUpdates.add(cluster); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 31bb299b455b..8821d82bff3e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -637,7 +637,29 @@ public void testQuotaMetrics() throws Exception { } @Test - public void testGetOffsetsByTimes() { + public void testGetOffsetsForTimes() { + // Error code none with unknown offset + testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L); + // Error code none with known offset + testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 100L, 10L, 100L); + // Test both of partition has error. + testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L); + // Test the second partition has error. + testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_FOR_PARTITION, 10L, 100L, 10L, 100L); + // Test different errors. + testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L); + testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L); + testGetOffsetsForTimesWithError(Errors.INVALID_REQUEST, Errors.NONE, 10L, 100L, null, 100L); + testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L); + } + + private void testGetOffsetsForTimesWithError(Errors errorForTp0, + Errors errorForTp1, + Long offsetForTp0, + Long offsetForTp1, + Long expectedOffsetForTp0, + Long expectedOffsetForTp1) { + client.reset(); TopicPartition tp0 = tp; TopicPartition tp1 = new TopicPartition(topicName, 1); // Ensure metadata has both partition. @@ -645,20 +667,30 @@ public void testGetOffsetsByTimes() { metadata.update(cluster, time.milliseconds()); // First try should fail due to metadata error. - client.prepareResponseFrom(listOffsetResponse(Errors.NONE, 10L, 10L), cluster.leaderFor(tp0)); - client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NOT_LEADER_FOR_PARTITION, -1L, -1L), cluster.leaderFor(tp1)); + client.prepareResponseFrom(listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0)); + client.prepareResponseFrom(listOffsetResponse(tp1, errorForTp1, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1)); // Second try should succeed. - client.prepareResponseFrom(listOffsetResponse(Errors.NONE, 100L, 100L), cluster.leaderFor(tp0)); - client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 200L, 200L), cluster.leaderFor(tp1)); + client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0)); + client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1)); Map timestampToSearch = new HashMap<>(); timestampToSearch.put(tp0, 0L); timestampToSearch.put(tp1, 0L); Map offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch); - assertEquals(offsetAndTimestampMap.get(tp0).timestamp(), 100L); - assertEquals(offsetAndTimestampMap.get(tp0).offset(), 100L); - assertEquals(offsetAndTimestampMap.get(tp1).timestamp(), 200L); - assertEquals(offsetAndTimestampMap.get(tp1).offset(), 200L); + + if (expectedOffsetForTp0 == null) + assertNull(offsetAndTimestampMap.get(tp0)); + else { + assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).timestamp()); + assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).offset()); + } + + if (expectedOffsetForTp1 == null) + assertNull(offsetAndTimestampMap.get(tp1)); + else { + assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).timestamp()); + assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).offset()); + } } private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) { diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 55dbfee2159d..551fa6d6bc57 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -164,11 +164,9 @@ class FileMessageSet private[kafka](@volatile var file: File, * @return The timestamp and offset of the message found. None, if no message is found. */ def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[TimestampOffset] = { - var lastOffsetChecked = -1L val messagesToSearch = read(startingPosition, sizeInBytes) for (messageAndOffset <- messagesToSearch) { val message = messageAndOffset.message - lastOffsetChecked = messageAndOffset.offset if (message.timestamp >= targetTimestamp) { // We found a message message.compressionCodec match { From ea2e23ba1f7008c838cb1415ad5b549838746a9f Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Sat, 24 Sep 2016 02:03:34 -0700 Subject: [PATCH 04/16] Addressed Jason's comments --- .../kafka/clients/consumer/KafkaConsumer.java | 6 ++--- .../clients/consumer/internals/Fetcher.java | 23 ++++++++++++------- .../consumer/internals/FetcherTest.java | 12 +++++++++- docs/upgrade.html | 5 ++-- 4 files changed, 31 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 0d8462f350a2..1e20aaa0c31b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1423,7 +1423,7 @@ public Map offsetsForTimes(Map offsetsForTimes(Map beginningOffsets(Collection partitions) { - return fetcher.beginningOffsets(partitions); + return fetcher.beginningOffsets(partitions, requestTimeoutMs); } /** @@ -1456,7 +1456,7 @@ public Map beginningOffsets(Collection par */ @Override public Map endOffsets(Collection partitions) { - return fetcher.endOffsets(partitions); + return fetcher.endOffsets(partitions, requestTimeoutMs); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 08735ceb4136..a985aad346d5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -353,14 +353,16 @@ else if (strategy == OffsetResetStrategy.LATEST) throw new NoOffsetForPartitionException(partition); log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT)); - long offset = getOffsetsByTimes(Collections.singletonMap(partition, timestamp)).get(partition).offset(); + long offset = getOffsetsByTimes(Collections.singletonMap(partition, timestamp), Long.MAX_VALUE).get(partition).offset(); // we might lose the assignment while fetching the offset, so check it is still active if (subscriptions.isAssigned(partition)) this.subscriptions.seek(partition, offset); } - public Map getOffsetsByTimes(Map timestampsToSearch) { + public Map getOffsetsByTimes(Map timestampsToSearch, + Long timeout) { + long startMs = time.milliseconds(); while (true) { RequestFuture> future = sendListOffsetRequests(timestampsToSearch); client.poll(future); @@ -371,6 +373,9 @@ public Map getOffsetsByTimes(Map timeout) + throw new TimeoutException("Failed to get offsets by times in " + timeout + " ms"); + if (future.exception() instanceof InvalidMetadataException) client.awaitMetadataUpdate(); else @@ -378,20 +383,22 @@ public Map getOffsetsByTimes(Map beginningOffsets(Collection partitions) { - return beginningOrEndOffset(partitions, ListOffsetRequest.EARLIEST_TIMESTAMP); + public Map beginningOffsets(Collection partitions, Long timeout) { + return beginningOrEndOffset(partitions, ListOffsetRequest.EARLIEST_TIMESTAMP, timeout); } - public Map endOffsets(Collection partitions) { - return beginningOrEndOffset(partitions, ListOffsetRequest.LATEST_TIMESTAMP); + public Map endOffsets(Collection partitions, Long timeout) { + return beginningOrEndOffset(partitions, ListOffsetRequest.LATEST_TIMESTAMP, timeout); } - private Map beginningOrEndOffset(Collection partitions, long timestamp) { + private Map beginningOrEndOffset(Collection partitions, + long timestamp, + Long timeout) { Map timestampsToSearch = new HashMap<>(); for (TopicPartition tp : partitions) timestampsToSearch.put(tp, timestamp); Map result = new HashMap<>(); - for (Map.Entry entry : getOffsetsByTimes(timestampsToSearch).entrySet()) + for (Map.Entry entry : getOffsetsByTimes(timestampsToSearch, timeout).entrySet()) result.put(entry.getKey(), entry.getValue().offset()); return result; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 8821d82bff3e..5b49a43ee45c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -636,6 +636,16 @@ public void testQuotaMetrics() throws Exception { assertEquals(300, maxMetric.value(), EPSILON); } + @Test + public void testGetOffsetsForTimesTimeout() { + try { + fetcher.getOffsetsByTimes(Collections.singletonMap(new TopicPartition(topicName, 2), 1000L), 100L); + fail("Should throw timeout exception."); + } catch (TimeoutException e) { + // let it go. + } + } + @Test public void testGetOffsetsForTimes() { // Error code none with unknown offset @@ -676,7 +686,7 @@ private void testGetOffsetsForTimesWithError(Errors errorForTp0, Map timestampToSearch = new HashMap<>(); timestampToSearch.put(tp0, 0L); timestampToSearch.put(tp1, 0L); - Map offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch); + Map offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE); if (expectedOffsetForTp0 == null) assertNull(offsetAndTimestampMap.get(tp0)); diff --git a/docs/upgrade.html b/docs/upgrade.html index 309f68a1442b..a820447db845 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -54,10 +54,9 @@
    Potential breaki
  • Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time.
  • -
    New protocol versions in 0.10.1.0
    +
    New protocols in 0.10.1.0
      -
    • ListOffsetRequest v1 is introduced and used by default to support accurate offset search based on timestamp. See KIP-79 for details.
    • -
    • FetchRequest v3 is introduced to cap the maximum bytes returned by each fetch response. See KIP-74 for details.
    • +
    • ListOffsetRequest v1 is introduced and used by default to support accurate offset search based on timestamp.
    Notable changes in 0.10.1.0
    From 509aaa56cd2b13775821695377c4eba7515a277f Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Sun, 25 Sep 2016 19:07:41 -0700 Subject: [PATCH 05/16] Addressed Ismael's comments. --- .../kafka/clients/consumer/Consumer.java | 1 - .../kafka/clients/consumer/KafkaConsumer.java | 1 - .../kafka/clients/consumer/MockConsumer.java | 3 +- .../consumer}/OffsetAndTimestamp.java | 2 +- .../clients/consumer/internals/Fetcher.java | 12 ++++---- .../errors/MessageFormatTooOldException.java | 29 +++++++++++++++++++ .../apache/kafka/common/protocol/Errors.java | 5 +++- .../consumer/internals/FetcherTest.java | 4 +-- core/src/main/scala/kafka/log/Log.scala | 4 +-- .../main/scala/kafka/server/KafkaApis.scala | 4 +-- docs/upgrade.html | 5 ---- 11 files changed, 47 insertions(+), 23 deletions(-) rename clients/src/main/java/org/apache/kafka/{common/record => clients/consumer}/OffsetAndTimestamp.java (97%) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/MessageFormatTooOldException.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 06e1bec25400..18cb5603abbe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -16,7 +16,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.record.OffsetAndTimestamp; import java.io.Closeable; import java.util.Collection; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 1e20aaa0c31b..4d2ff4093ef0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -37,7 +37,6 @@ import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; -import org.apache.kafka.common.record.OffsetAndTimestamp; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.AppInfoParser; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 3af2344e2fcc..fdce0644c725 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.record.OffsetAndTimestamp; import java.util.ArrayList; import java.util.Collection; @@ -301,7 +300,7 @@ public void resume(Collection partitions) { @Override public Map offsetsForTimes(Map timestampsToSearch) { - return null; + throw new UnsupportedOperationException("Not implemented yet."); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java similarity index 97% rename from clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java index 7ddd3ecebca7..01f940a8a88f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/OffsetAndTimestamp.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.record; +package org.apache.kafka.clients.consumer; /** * A container class for offset and timestamp. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index a985aad346d5..587977f4d4f9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -41,7 +41,7 @@ import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.LogEntry; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.FetchRequest; @@ -361,7 +361,7 @@ else if (strategy == OffsetResetStrategy.LATEST) } public Map getOffsetsByTimes(Map timestampsToSearch, - Long timeout) { + long timeout) { long startMs = time.milliseconds(); while (true) { RequestFuture> future = sendListOffsetRequests(timestampsToSearch); @@ -383,17 +383,17 @@ public Map getOffsetsByTimes(Map beginningOffsets(Collection partitions, Long timeout) { + public Map beginningOffsets(Collection partitions, long timeout) { return beginningOrEndOffset(partitions, ListOffsetRequest.EARLIEST_TIMESTAMP, timeout); } - public Map endOffsets(Collection partitions, Long timeout) { + public Map endOffsets(Collection partitions, long timeout) { return beginningOrEndOffset(partitions, ListOffsetRequest.LATEST_TIMESTAMP, timeout); } private Map beginningOrEndOffset(Collection partitions, long timestamp, - Long timeout) { + long timeout) { Map timestampsToSearch = new HashMap<>(); for (TopicPartition tp : partitions) timestampsToSearch.put(tp, timestamp); @@ -578,7 +578,7 @@ private void handleListOffsetResponse(Map timestampsToSear offsetAndTimestamp = new OffsetAndTimestamp(partitionData.offset, partitionData.timestamp); log.debug("Fetched {} for partition {}", offsetAndTimestamp, topicPartition); timestampOffsetMap.put(topicPartition, offsetAndTimestamp); - } else if (error == Errors.INVALID_REQUEST) { + } else if (error == Errors.MESSAGE_FORMAT_TOO_OLD) { // The message format on the broker side is before 0.10.0, we simply put null in the response. log.debug("Cannot search by timestamp for partition {} because the message format version " + "is before 0.10.0", topicPartition); diff --git a/clients/src/main/java/org/apache/kafka/common/errors/MessageFormatTooOldException.java b/clients/src/main/java/org/apache/kafka/common/errors/MessageFormatTooOldException.java new file mode 100644 index 000000000000..d60977ea2e3a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/MessageFormatTooOldException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * The message format version is too old and the requested function is not supported. + */ +public class MessageFormatTooOldException extends ApiException { + private static final long serialVersionUID = 1L; + + public MessageFormatTooOldException(String message) { + super(message); + } + + public MessageFormatTooOldException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 16f0c8e17f25..0569cde44d44 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -43,6 +43,7 @@ import org.apache.kafka.common.errors.InvalidTimestampException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.MessageFormatTooOldException; import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.NotControllerException; import org.apache.kafka.common.errors.NotCoordinatorForGroupException; @@ -161,7 +162,9 @@ public enum Errors { new NotControllerException("This is not the correct controller for this cluster.")), INVALID_REQUEST(42, new InvalidRequestException("This most likely occurs because of a request being malformed by the client library or" + - " the message was sent to an incompatible broker. See the broker logs for more details.")); + " the message was sent to an incompatible broker. See the broker logs for more details.")), + MESSAGE_FORMAT_TOO_OLD(43, + new MessageFormatTooOldException("The message format version on the broker is too old to support the request.")); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 5b49a43ee45c..a7b0ebac2378 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -41,7 +41,7 @@ import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Compressor; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; @@ -659,7 +659,7 @@ public void testGetOffsetsForTimes() { // Test different errors. testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L); testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L); - testGetOffsetsForTimesWithError(Errors.INVALID_REQUEST, Errors.NONE, 10L, 100L, null, 100L); + testGetOffsetsForTimesWithError(Errors.MESSAGE_FORMAT_TOO_OLD, Errors.NONE, 10L, 100L, null, 100L); testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L); } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 188c9b51844f..f6ecffd6696b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -28,7 +28,7 @@ import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} import java.util.concurrent.atomic._ import java.text.NumberFormat -import org.apache.kafka.common.errors.{InvalidRequestException, CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException} +import org.apache.kafka.common.errors.{MessageFormatTooOldException, CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException} import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.requests.ListOffsetRequest @@ -594,7 +594,7 @@ class Log(val dir: File, if (config.messageFormatVersion < KAFKA_0_10_0_IV0 && targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP && targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP) - throw new InvalidRequestException(s"Cannot search offsets based on timestamp because message format version " + + throw new MessageFormatTooOldException(s"Cannot search offsets based on timestamp because message format version " + s"for partition $topicAndPartition is ${config.messageFormatVersion} which is earlier than the minimum " + s"required version $KAFKA_0_10_0_IV0") diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9e907e18aa25..3cde9f9fc2dc 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -36,7 +36,7 @@ import kafka.network.RequestChannel.{Response, Session} import kafka.security.auth import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Write, Delete} import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils} -import org.apache.kafka.common.errors.{InvalidRequestException, ClusterAuthorizationException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException} +import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException, MessageFormatTooOldException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol} import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse} @@ -653,7 +653,7 @@ class KafkaApis(val requestChannel: RequestChannel, // would have received a clear exception and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException | - _ : InvalidRequestException) => + _ : MessageFormatTooOldException) => debug(s"Offset request with correlation id $correlationId from client $clientId on " + s"partition $topicPartition failed due to ${e.getMessage}") (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, diff --git a/docs/upgrade.html b/docs/upgrade.html index a820447db845..f5c417287137 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -54,11 +54,6 @@
    Potential breaki
  • Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time.
  • -
    New protocols in 0.10.1.0
    -
      -
    • ListOffsetRequest v1 is introduced and used by default to support accurate offset search based on timestamp. -
    -
    Notable changes in 0.10.1.0
    • The new Java consumer is no longer in beta and we recommend it for all new development. The old Scala consumers are still supported, but they will be deprecated in the next release From c789541b22c4fca60611ec2654c3de8d9e186d9c Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 26 Sep 2016 11:22:11 -0700 Subject: [PATCH 06/16] Addressed Jason's comments --- .../internals/ConsumerNetworkClient.java | 10 +++++- .../clients/consumer/internals/Fetcher.java | 36 +++++++++++-------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index ef78c05681ef..645af27bbf03 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -132,10 +132,18 @@ public Node leastLoadedNode() { * Block until the metadata has been refreshed. */ public void awaitMetadataUpdate() { + awaitMetadataUpdate(Long.MAX_VALUE); + } + + /** + * Block waiting on the metadata refresh with a timeout. + */ + public boolean awaitMetadataUpdate(long timeout) { int version = this.metadata.requestUpdate(); do { - poll(Long.MAX_VALUE); + poll(timeout); } while (this.metadata.version() == version); + return this.metadata.version() > version; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 587977f4d4f9..e7441db355ba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -363,23 +363,32 @@ else if (strategy == OffsetResetStrategy.LATEST) public Map getOffsetsByTimes(Map timestampsToSearch, long timeout) { long startMs = time.milliseconds(); + long remaining = timeout; while (true) { RequestFuture> future = sendListOffsetRequests(timestampsToSearch); - client.poll(future); + client.poll(future, remaining); if (future.succeeded()) return future.value(); - if (!future.isRetriable()) + if (future.isDone() && !future.isRetriable()) throw future.exception(); - if (time.milliseconds() - startMs > timeout) + long elapsed = time.milliseconds() - startMs; + remaining = timeout - elapsed; + if (remaining <= 0) throw new TimeoutException("Failed to get offsets by times in " + timeout + " ms"); - if (future.exception() instanceof InvalidMetadataException) - client.awaitMetadataUpdate(); - else - time.sleep(retryBackoffMs); + if (future.exception() instanceof InvalidMetadataException) { + if (!client.awaitMetadataUpdate(remaining)) + throw new TimeoutException("Failed to get offsets by times in " + timeout + " ms"); + } else + time.sleep(Math.min(remaining, retryBackoffMs)); + + elapsed = time.milliseconds() - startMs; + remaining = timeout - elapsed; + if (remaining <= 0) + throw new TimeoutException("Failed to get offsets by times in " + timeout + " ms"); } } @@ -550,8 +559,9 @@ private RequestFuture> sendListOffsetReq .compose(new RequestFutureAdapter>() { @Override public void onSuccess(ClientResponse response, RequestFuture> future) { - log.trace("ListOffsetResponse received from node {}", node); - handleListOffsetResponse(timestampsToSearch, response, future); + ListOffsetResponse lor = new ListOffsetResponse(response.responseBody()); + log.trace("Received ListOffsetResponse {} from broker {}", lor, node); + handleListOffsetResponse(timestampsToSearch, lor, future); } }); } @@ -559,18 +569,16 @@ public void onSuccess(ClientResponse response, RequestFuture timestampsToSearch, - ClientResponse clientResponse, + ListOffsetResponse listOffsetResponse, RequestFuture> future) { - ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody()); - log.trace("Received ListOffsetResponse {}", lor); Map timestampOffsetMap = new HashMap<>(); for (Map.Entry entry : timestampsToSearch.entrySet()) { TopicPartition topicPartition = entry.getKey(); - ListOffsetResponse.PartitionData partitionData = lor.responseData().get(topicPartition); + ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition); Errors error = Errors.forCode(partitionData.errorCode); if (error == Errors.NONE) { OffsetAndTimestamp offsetAndTimestamp = null; From f971deb5296e6e46dfad3a9a70c057783952d5de Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 26 Sep 2016 13:27:35 -0700 Subject: [PATCH 07/16] Fixed the timeout for awaitMetadataUpdates --- .../clients/consumer/internals/ConsumerNetworkClient.java | 3 ++- .../consumer/internals/ConsumerNetworkClientTest.java | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 645af27bbf03..3d41a92314a1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -139,10 +139,11 @@ public void awaitMetadataUpdate() { * Block waiting on the metadata refresh with a timeout. */ public boolean awaitMetadataUpdate(long timeout) { + long startMs = time.milliseconds(); int version = this.metadata.requestUpdate(); do { poll(timeout); - } while (this.metadata.version() == version); + } while (this.metadata.version() == version && time.milliseconds() - startMs < timeout); return this.metadata.version() > version; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 368998c290b5..f90cd636926d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -40,7 +40,7 @@ public class ConsumerNetworkClientTest { private String topicName = "test"; - private MockTime time = new MockTime(); + private MockTime time = new MockTime(1); private MockClient client = new MockClient(time); private Cluster cluster = TestUtils.singletonCluster(topicName, 1); private Node node = cluster.nodes().get(0); @@ -132,6 +132,11 @@ public void wakeup() { assertTrue(future.isDone()); } + @Test + public void testAwaitForMetadataUpdateWithTimeout() { + assertFalse(consumerClient.awaitMetadataUpdate(10L)); + } + @Test public void sendExpiry() throws InterruptedException { long unsentExpiryMs = 10; From accc2ffa482c1bc92c62648073f01f5921b80d67 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 26 Sep 2016 22:30:28 -0700 Subject: [PATCH 08/16] Addressed Jason's comments --- .../apache/kafka/clients/consumer/internals/Fetcher.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index e7441db355ba..d07a7c275cc6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -364,7 +364,7 @@ public Map getOffsetsByTimes(Map> future = sendListOffsetRequests(timestampsToSearch); client.poll(future, remaining); @@ -377,7 +377,7 @@ public Map getOffsetsByTimes(Map getOffsetsByTimes(Map 0); + throw new TimeoutException("Failed to get offsets by times in " + timeout + " ms"); } public Map beginningOffsets(Collection partitions, long timeout) { From 27126615796498ef698998e544848ac2beaaa98e Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 26 Sep 2016 23:28:53 -0700 Subject: [PATCH 09/16] changed MessageFormatTooOld to UnsupportedForMessageFormat --- .../apache/kafka/clients/consumer/internals/Fetcher.java | 2 +- ...ion.java => UnsupportedForMessageFormatException.java} | 8 ++++---- .../java/org/apache/kafka/common/protocol/Errors.java | 6 +++--- .../kafka/clients/consumer/internals/FetcherTest.java | 2 +- core/src/main/scala/kafka/log/Log.scala | 4 ++-- core/src/main/scala/kafka/server/KafkaApis.scala | 4 ++-- 6 files changed, 13 insertions(+), 13 deletions(-) rename clients/src/main/java/org/apache/kafka/common/errors/{MessageFormatTooOldException.java => UnsupportedForMessageFormatException.java} (76%) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index d07a7c275cc6..92c8a7ca74e1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -585,7 +585,7 @@ private void handleListOffsetResponse(Map timestampsToSear offsetAndTimestamp = new OffsetAndTimestamp(partitionData.offset, partitionData.timestamp); log.debug("Fetched {} for partition {}", offsetAndTimestamp, topicPartition); timestampOffsetMap.put(topicPartition, offsetAndTimestamp); - } else if (error == Errors.MESSAGE_FORMAT_TOO_OLD) { + } else if (error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) { // The message format on the broker side is before 0.10.0, we simply put null in the response. log.debug("Cannot search by timestamp for partition {} because the message format version " + "is before 0.10.0", topicPartition); diff --git a/clients/src/main/java/org/apache/kafka/common/errors/MessageFormatTooOldException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java similarity index 76% rename from clients/src/main/java/org/apache/kafka/common/errors/MessageFormatTooOldException.java rename to clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java index d60977ea2e3a..00356f3a7cb8 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/MessageFormatTooOldException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java @@ -13,16 +13,16 @@ package org.apache.kafka.common.errors; /** - * The message format version is too old and the requested function is not supported. + * The message format version does not support the requested function. */ -public class MessageFormatTooOldException extends ApiException { +public class UnsupportedForMessageFormatException extends ApiException { private static final long serialVersionUID = 1L; - public MessageFormatTooOldException(String message) { + public UnsupportedForMessageFormatException(String message) { super(message); } - public MessageFormatTooOldException(String message, Throwable cause) { + public UnsupportedForMessageFormatException(String message, Throwable cause) { super(message, cause); } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 0569cde44d44..f32399da65ea 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -43,7 +43,7 @@ import org.apache.kafka.common.errors.InvalidTimestampException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; -import org.apache.kafka.common.errors.MessageFormatTooOldException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.NotControllerException; import org.apache.kafka.common.errors.NotCoordinatorForGroupException; @@ -163,8 +163,8 @@ public enum Errors { INVALID_REQUEST(42, new InvalidRequestException("This most likely occurs because of a request being malformed by the client library or" + " the message was sent to an incompatible broker. See the broker logs for more details.")), - MESSAGE_FORMAT_TOO_OLD(43, - new MessageFormatTooOldException("The message format version on the broker is too old to support the request.")); + UNSUPPORTED_FOR_MESSAGE_FORMAT(43, + new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index a7b0ebac2378..1dddffbeff36 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -659,7 +659,7 @@ public void testGetOffsetsForTimes() { // Test different errors. testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L); testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L); - testGetOffsetsForTimesWithError(Errors.MESSAGE_FORMAT_TOO_OLD, Errors.NONE, 10L, 100L, null, 100L); + testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, 100L, null, 100L); testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L); } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f6ecffd6696b..f29cde76d9fa 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -28,7 +28,7 @@ import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} import java.util.concurrent.atomic._ import java.text.NumberFormat -import org.apache.kafka.common.errors.{MessageFormatTooOldException, CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException} +import org.apache.kafka.common.errors.{UnsupportedForMessageFormatException, CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException} import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.requests.ListOffsetRequest @@ -594,7 +594,7 @@ class Log(val dir: File, if (config.messageFormatVersion < KAFKA_0_10_0_IV0 && targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP && targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP) - throw new MessageFormatTooOldException(s"Cannot search offsets based on timestamp because message format version " + + throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " + s"for partition $topicAndPartition is ${config.messageFormatVersion} which is earlier than the minimum " + s"required version $KAFKA_0_10_0_IV0") diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 3cde9f9fc2dc..5328ae32f583 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -36,7 +36,7 @@ import kafka.network.RequestChannel.{Response, Session} import kafka.security.auth import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Write, Delete} import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils} -import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException, MessageFormatTooOldException} +import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException, UnsupportedForMessageFormatException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol} import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse} @@ -653,7 +653,7 @@ class KafkaApis(val requestChannel: RequestChannel, // would have received a clear exception and there is no value in logging the entire stack trace for the same case e @ (_ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException | - _ : MessageFormatTooOldException) => + _ : UnsupportedForMessageFormatException) => debug(s"Offset request with correlation id $correlationId from client $clientId on " + s"partition $topicPartition failed due to ${e.getMessage}") (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, From 9fa9d7cb4ea586d2f0ba5ec8afe1950dee46fb51 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Tue, 27 Sep 2016 09:49:44 -0700 Subject: [PATCH 10/16] addressed Jason's comments. --- .../org/apache/kafka/clients/consumer/internals/Fetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 92c8a7ca74e1..da0249c62230 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -381,7 +381,7 @@ public Map getOffsetsByTimes(Map Date: Tue, 27 Sep 2016 13:01:20 -0700 Subject: [PATCH 11/16] Addressed Jason's comments. --- .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 2 ++ .../org/apache/kafka/common/requests/ListOffsetResponse.java | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 4d2ff4093ef0..275c01451532 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1418,6 +1418,8 @@ public Set paused() { @Override public Map offsetsForTimes(Map timestampsToSearch) { for (Map.Entry entry : timestampsToSearch.entrySet()) { + // we explicitly exclude the earliest and latest offset in offsets for times so the timestamp in + // OffsetAndTimestamp is always positive. if (entry.getValue() < 0) throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + entry.getValue() + ". The target time cannot be negative."); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java index bc8c8d656904..dbeef051d2ad 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java @@ -49,7 +49,7 @@ public class ListOffsetResponse extends AbstractRequestResponse { * * UNKNOWN_TOPIC_OR_PARTITION (3) * NOT_LEADER_FOR_PARTITION (6) - * INVALID_REQUEST (42) + * UNSUPPORTED_FOR_MESSAGE_FORMAT (43) * UNKNOWN (-1) */ From 9548a3b723716839ffeb2f2828c4d98d1fe59ed7 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Tue, 27 Sep 2016 13:35:16 -0700 Subject: [PATCH 12/16] minor comments fix --- .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 275c01451532..f57a80dd9747 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1418,7 +1418,7 @@ public Set paused() { @Override public Map offsetsForTimes(Map timestampsToSearch) { for (Map.Entry entry : timestampsToSearch.entrySet()) { - // we explicitly exclude the earliest and latest offset in offsets for times so the timestamp in + // we explicitly exclude the earliest and latest offset here so the timestamp in the returned // OffsetAndTimestamp is always positive. if (entry.getValue() < 0) throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + From 98a01df455ce581e12f35689d8b10cb52c93317d Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Tue, 27 Sep 2016 21:45:42 -0700 Subject: [PATCH 13/16] Addressed Ismael and Jason's comments. --- .../clients/consumer/OffsetAndTimestamp.java | 6 ++- .../internals/ConsumerNetworkClient.java | 2 + .../org/apache/kafka/common/utils/Utils.java | 4 ++ .../main/scala/kafka/server/KafkaApis.scala | 39 ++++++++++--------- docs/upgrade.html | 2 +- 5 files changed, 32 insertions(+), 21 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java index 01f940a8a88f..c1524a9d98de 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java @@ -16,8 +16,12 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.common.utils.Utils; + /** * A container class for offset and timestamp. + * + * Both the offsets the timestamp in this class is non-negative. */ public final class OffsetAndTimestamp { private final long timestamp; @@ -43,7 +47,7 @@ public String toString() { @Override public int hashCode() { - return 31 * Long.valueOf(timestamp).hashCode() + Long.valueOf(offset).hashCode(); + return 31 * Utils.longHashcode(timestamp) + Utils.longHashcode(offset); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 3d41a92314a1..21fe0b8adc1a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -137,6 +137,8 @@ public void awaitMetadataUpdate() { /** * Block waiting on the metadata refresh with a timeout. + * + * @return true if update succeeded, false otherwise. */ public boolean awaitMetadataUpdate(long timeout) { long startMs = time.milliseconds(); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 1bbfea93f105..3cd80c4bade4 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -728,4 +728,8 @@ public static void closeQuietly(Closeable closeable, String name) { public static int toPositive(int number) { return number & 0x7fffffff; } + + public static int longHashcode(long value) { + return (int) (value ^ (value >>> 32)); + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5328ae32f583..679fa090e65e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -624,27 +624,28 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.getLeaderReplicaIfLocal(topicPartition.topic, topicPartition.partition) else replicaManager.getReplicaOrException(topicPartition.topic, topicPartition.partition) + // There are following two special cases to be handled when a ListOffsetRequest is from a consumer. + // 1. If a consumer asks for the latest offset, return the high watermark instead of log end offset. + // 2. If a consumer searches for a timestamp and the offset found is greater than the high watermark, + // return the unknown offset as if the target timestamp is not found. + // In the other cases, we just return the search result without speical handling. val found = { - fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) match { - case Some(timestampOffset) => - // The request is not from a consumer client - if (offsetRequest.replicaId != ListOffsetRequest.CONSUMER_REPLICA_ID) - timestampOffset - // The request is from a consumer client - else { - // the found offset is smaller or equals to the high watermark - if (timestampOffset.offset <= localReplica.highWatermark.messageOffset) - timestampOffset - // the consumer wants the latest offset. - else if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP) - TimestampOffset(Message.NoTimestamp, localReplica.highWatermark.messageOffset) - // The found offset is higher than the high watermark and the consumer is not asking for the end offset. - else + // case 1 + if (offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID + && timestamp == ListOffsetRequest.LATEST_TIMESTAMP) + TimestampOffset(Message.NoTimestamp, localReplica.highWatermark.messageOffset) + else { + fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) match { + case Some(timestampOffset) => + // case 2 + if (offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID + && timestampOffset.offset > localReplica.highWatermark.messageOffset) TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) - } - - case None => - TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) + else + timestampOffset + case None => + TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) + } } } (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE.code, found.timestamp, found.offset)) diff --git a/docs/upgrade.html b/docs/upgrade.html index f5c417287137..440c2c926f50 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -60,7 +60,7 @@
      Notable changes in and will be removed in a future major release.
    • Kafka clusters can now be uniquely identified by a cluster id. It will be automatically generated when a broker is upgraded to 0.10.1.0. The cluster id is available via the kafka.server:type=KafkaServer,name=ClusterId metric and it is part of the Metadata response. Serializers, client interceptors and metric reporters can receive the cluster id by implementing the ClusterResourceListener interface.
    • The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric.
    • -
    • The new Java Consumer now allows users to search offsets by timestamp on partitions. +
    • The new Java Consumer now allows users to search offsets by timestamp on partitions.
    New Protocol Versions
    From 5b77ca9c07b0ab79777a77faceb6baef5abc196d Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 28 Sep 2016 13:30:35 +0100 Subject: [PATCH 14/16] Suppress deprecation warnings and other minor clean-ups --- .../org/apache/kafka/clients/consumer/OffsetAndTimestamp.java | 4 ++-- .../org/apache/kafka/common/requests/ListOffsetRequest.java | 1 + .../apache/kafka/clients/consumer/internals/FetcherTest.java | 4 ++-- .../org/apache/kafka/common/requests/RequestResponseTest.java | 4 ++++ 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java index c1524a9d98de..32baf38c8b2c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java @@ -21,7 +21,7 @@ /** * A container class for offset and timestamp. * - * Both the offsets the timestamp in this class is non-negative. + * Both offset and timestamp are non-negative. */ public final class OffsetAndTimestamp { private final long timestamp; @@ -42,7 +42,7 @@ public long offset() { @Override public String toString() { - return "{Timestamp = " + timestamp + ", Offset = " + offset + "}"; + return "{timestamp=" + timestamp + ", offset=" + offset + "}"; } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 651077cd6e87..1aed5231b562 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -165,6 +165,7 @@ public ListOffsetRequest(Struct struct) { } @Override + @SuppressWarnings("deprecation") public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { Map responseData = new HashMap(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 1dddffbeff36..4f567963eba7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -665,8 +665,8 @@ public void testGetOffsetsForTimes() { private void testGetOffsetsForTimesWithError(Errors errorForTp0, Errors errorForTp1, - Long offsetForTp0, - Long offsetForTp1, + long offsetForTp0, + long offsetForTp1, Long expectedOffsetForTp0, Long expectedOffsetForTp1) { client.reset(); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 574f52d72831..30faac1c221f 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -257,6 +257,7 @@ private AbstractRequestResponse createHeartBeatResponse() { return new HeartbeatResponse(Errors.NONE.code()); } + @SuppressWarnings("deprecation") private AbstractRequest createJoinGroupRequest(int version) { ByteBuffer metadata = ByteBuffer.wrap(new byte[] {}); List protocols = new ArrayList<>(); @@ -307,6 +308,7 @@ private AbstractRequestResponse createLeaveGroupResponse() { return new LeaveGroupResponse(Errors.NONE.code()); } + @SuppressWarnings("deprecation") private AbstractRequest createListOffsetRequest(int version) { if (version == 0) { Map offsetData = new HashMap<>(); @@ -321,6 +323,7 @@ private AbstractRequest createListOffsetRequest(int version) { } } + @SuppressWarnings("deprecation") private AbstractRequestResponse createListOffsetResponse(int version) { if (version == 0) { Map responseData = new HashMap<>(); @@ -353,6 +356,7 @@ private AbstractRequestResponse createMetadataResponse(int version) { return new MetadataResponse(Arrays.asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata, version); } + @SuppressWarnings("deprecation") private AbstractRequest createOffsetCommitRequest(int version) { Map commitData = new HashMap<>(); commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, "")); From 7feae8c7db72be86b6c3546988996422dfe4a06d Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 28 Sep 2016 13:31:29 +0100 Subject: [PATCH 15/16] Refactor `handleOffsetRequestV1` to be simpler and self-documenting --- .../main/scala/kafka/server/KafkaApis.scala | 31 +++++++------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 679fa090e65e..85c47e6f745d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -610,7 +610,7 @@ class KafkaApis(val requestChannel: RequestChannel, ListOffsetResponse.UNKNOWN_OFFSET) }) - val responseMap = authorizedRequestInfo.map({case (topicPartition, timestamp) => + val responseMap = authorizedRequestInfo.map { case (topicPartition, timestamp) => if (offsetRequest.duplicatePartitions().contains(topicPartition)) { debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + s"failed because the partition is duplicated in the request.") @@ -619,35 +619,28 @@ class KafkaApis(val requestChannel: RequestChannel, ListOffsetResponse.UNKNOWN_OFFSET)) } else { try { + val fromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID + // ensure leader exists val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) replicaManager.getLeaderReplicaIfLocal(topicPartition.topic, topicPartition.partition) else replicaManager.getReplicaOrException(topicPartition.topic, topicPartition.partition) - // There are following two special cases to be handled when a ListOffsetRequest is from a consumer. - // 1. If a consumer asks for the latest offset, return the high watermark instead of log end offset. - // 2. If a consumer searches for a timestamp and the offset found is greater than the high watermark, - // return the unknown offset as if the target timestamp is not found. - // In the other cases, we just return the search result without speical handling. + val found = { - // case 1 - if (offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - && timestamp == ListOffsetRequest.LATEST_TIMESTAMP) + if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP) TimestampOffset(Message.NoTimestamp, localReplica.highWatermark.messageOffset) else { + def allowed(timestampOffset: TimestampOffset): Boolean = + !fromConsumer || timestampOffset.offset <= localReplica.highWatermark.messageOffset + fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) match { - case Some(timestampOffset) => - // case 2 - if (offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID - && timestampOffset.offset > localReplica.highWatermark.messageOffset) - TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) - else - timestampOffset - case None => - TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) + case Some(timestampOffset) if allowed(timestampOffset) => timestampOffset + case _ => TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) } } } + (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE.code, found.timestamp, found.offset)) } catch { // NOTE: These exceptions are special cased since these error messages are typically transient or the client @@ -667,7 +660,7 @@ class KafkaApis(val requestChannel: RequestChannel, ListOffsetResponse.UNKNOWN_OFFSET)) } } - }) + } responseMap ++ unauthorizedResponseStatus } From 9e1a321ae8843eafb4e98e3f0b2d1175a97bddde Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 28 Sep 2016 13:32:17 +0100 Subject: [PATCH 16/16] Minor clean-up in `Fetcher` as per Jason's suggestion --- .../kafka/clients/consumer/internals/Fetcher.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index da0249c62230..fe19dbec6a67 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -368,10 +368,13 @@ public Map getOffsetsByTimes(Map> future = sendListOffsetRequests(timestampsToSearch); client.poll(future, remaining); + if (!future.isDone()) + break; + if (future.succeeded()) return future.value(); - if (future.isDone() && !future.isRetriable()) + if (!future.isRetriable()) throw future.exception(); long elapsed = time.milliseconds() - startMs; @@ -379,10 +382,9 @@ public Map getOffsetsByTimes(Map