Skip to content

Commit

Permalink
CONFLUENT: Sync from apache/kafka (21 January 2020)
Browse files Browse the repository at this point in the history
Conflicts or compilation errors due to the fact that we temporarily
reverted the commit that removes Scala 2.11 support:

* AclCommand.scala: take upstream changes.
* AclCommandTest.scala: take upstream changes.
* TransactionCoordinatorTest.scala: don't use SAMs, but adjust
mock call to putTransactionStateIfNotExists given new signature.
* TransactionStateManagerTest: use Runnable instead of SAMs.
* PartitionLockTest: use Runnable instead of SAMs.
* docs/upgrade.html: take upstream changes excluding line that
states that Scala 2.11 support has been removed.

* apache-github/trunk: (28 commits)
  KAFKA-9457; Fix flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose (apache#7989)
  MINOR: Update AclCommand help message to match implementation (apache#7990)
  MINOR: Update introduction page in Kafka documentation
  MINOR: Use Math.min for StreamsPartitionAssignor#updateMinReceivedVersion method (apache#7954)
  KAFKA-9338; Fetch session should cache request leader epoch (apache#7970)
  KAFKA-9329; KafkaController::replicasAreValid should return error message (apache#7865)
  KAFKA-9449; Adds support for closing the producer's BufferPool. (apache#7967)
  MINOR: Handle expandIsr in PartitionLockTest and ensure read threads not blocked on write (apache#7973)
  MINOR: Fix typo in connect integration test class name (apache#7976)
  KAFKA-9218: MirrorMaker 2 can fail to create topics (apache#7745)
  KAFKA-8847; Deprecate and remove usage of supporting classes in kafka.security.auth (apache#7966)
  MINOR: Suppress DescribeConfigs Denied log during CreateTopics (apache#7971)
  [MINOR]: Fix typo in Fetcher comment (apache#7934)
  MINOR: Remove unnecessary call to `super` in `MetricConfig` constructor (apache#7975)
  MINOR: fix flaky StreamsUpgradeTestIntegrationTest (apache#7974)
  KAFKA-9431: Expose API in KafkaStreams to fetch all local offset lags (apache#7961)
  KAFKA-9235; Ensure transaction coordinator is stopped after replica deletion (apache#7963)
  KAFKA-9410; Make groupId Optional in KafkaConsumer (apache#7943)
  MINOR: Removed accidental double negation in error message. (apache#7834)
  KAFKA-6144: IQ option to query standbys (apache#7962)
  ...
  • Loading branch information
ijuma committed Jan 21, 2020
2 parents b83a695 + 1e823a6 commit c28a069
Show file tree
Hide file tree
Showing 165 changed files with 5,156 additions and 1,795 deletions.
1 change: 1 addition & 0 deletions build.gradle
Expand Up @@ -2080,6 +2080,7 @@ project(':connect:mirror') {
compile libs.slf4jApi

testCompile libs.junit
testCompile libs.mockitoCore
testCompile project(':clients').sourceSets.test.output
testCompile project(':connect:runtime').sourceSets.test.output
testCompile project(':core')
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
Expand All @@ -49,6 +50,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -706,9 +708,13 @@ public Node leastLoadedNode(long now) {
}

public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0);
return AbstractResponse.parseResponse(requestHeader.apiKey(), responseStruct,
requestHeader.apiVersion());
try {
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0);
return AbstractResponse.parseResponse(requestHeader.apiKey(), responseStruct,
requestHeader.apiVersion());
} catch (BufferUnderflowException e) {
throw new SchemaException("Buffer underflow while parsing response for request with header " + requestHeader, e);
}
}

private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader,
Expand Down
Expand Up @@ -3028,7 +3028,9 @@ private Call getListConsumerGroupOffsetsCall(ConsumerGroupOperationContext<Map<T
new ConstantNodeIdProvider(context.node().get().id())) {
@Override
OffsetFetchRequest.Builder createRequest(int timeoutMs) {
return new OffsetFetchRequest.Builder(context.groupId(), context.options().topicPartitions());
// Set the flag to false as for admin client request,
// we don't need to wait for any pending offset state to clear.
return new OffsetFetchRequest.Builder(context.groupId(), false, context.options().topicPartitions());
}

@Override
Expand Down
18 changes: 18 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;

/**
Expand Down Expand Up @@ -157,4 +158,21 @@ public String toString() {
append(")");
return bld.toString();
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final NewTopic that = (NewTopic) o;
return Objects.equals(name, that.name) &&
Objects.equals(numPartitions, that.numPartitions) &&
Objects.equals(replicationFactor, that.replicationFactor) &&
Objects.equals(replicasAssignments, that.replicasAssignments) &&
Objects.equals(configs, that.configs);
}

@Override
public int hashCode() {
return Objects.hash(name, numPartitions, replicationFactor, replicasAssignments, configs);
}
}
Expand Up @@ -570,7 +570,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {

private Logger log;
private final String clientId;
private String groupId;
private final Optional<String> groupId;
private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
Expand Down Expand Up @@ -673,28 +673,30 @@ private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, De
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
GroupRebalanceConfig.ProtocolType.CONSUMER);

this.groupId = groupRebalanceConfig.groupId;
this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
this.clientId = buildClientId(config.getString(CommonClientConfigs.CLIENT_ID_CONFIG), groupRebalanceConfig);

LogContext logContext;

// If group.instance.id is set, we will append it to the log context.
if (groupRebalanceConfig.groupInstanceId.isPresent()) {
logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() +
", clientId=" + clientId + ", groupId=" + groupId + "] ");
", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] ");
} else {
logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] ");
}

this.log = logContext.logger(getClass());
boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
if (groupId == null) { // overwrite in case of default group id where the config is not explicitly provided
if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
if (!groupId.isPresent()) { // overwrite in case of default group id where the config is not explicitly provided
if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
enableAutoCommit = false;
else if (enableAutoCommit)
} else if (enableAutoCommit) {
throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used.");
} else if (groupId.isEmpty())
}
} else if (groupId.get().isEmpty()) {
log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
}

log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
Expand Down Expand Up @@ -773,7 +775,7 @@ else if (enableAutoCommit)
this.assignors = getAssignorInstances(config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), config.originals());

// no coordinator will be constructed for the default (null) group id
this.coordinator = groupId == null ? null :
this.coordinator = !groupId.isPresent() ? null :
new ConsumerCoordinator(groupRebalanceConfig,
logContext,
this.client,
Expand Down Expand Up @@ -858,7 +860,7 @@ else if (enableAutoCommit)
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.assignors = assignors;
this.groupId = groupId;
this.groupId = Optional.ofNullable(groupId);
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
}

Expand Down Expand Up @@ -2436,7 +2438,7 @@ private void throwIfNoAssignorsConfigured() {
}

private void maybeThrowInvalidGroupIdException() {
if (groupId == null)
if (!groupId.isPresent())
throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
}
Expand Down
Expand Up @@ -40,7 +40,7 @@ public interface OffsetCommitCallback {
* or if there is an active group with the same groupId which is using group management.
* @throws org.apache.kafka.common.errors.RebalanceInProgressException if the commit failed because
* it is in the middle of a rebalance. In such cases
* commit could be retried after the rebalance is completed with the {@link #poll(Duration)} call.
* commit could be retried after the rebalance is completed with the {@link KafkaConsumer#poll(Duration)} call.
* @throws org.apache.kafka.common.errors.WakeupException if {@link KafkaConsumer#wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
Expand Down
Expand Up @@ -1315,7 +1315,7 @@ public void onFailure(RuntimeException e) {
protected static class Generation {
public static final Generation NO_GENERATION = new Generation(
OffsetCommitRequest.DEFAULT_GENERATION_ID,
JoinGroupResponse.UNKNOWN_MEMBER_ID,
JoinGroupRequest.UNKNOWN_MEMBER_ID,
null);

public final int generationId;
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.UnstableOffsetCommitException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
Expand All @@ -54,7 +55,6 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
Expand Down Expand Up @@ -168,7 +168,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
this.pendingAsyncCommits = new AtomicInteger();
this.asyncCommitFenced = new AtomicBoolean(false);
this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId,
JoinGroupResponse.UNKNOWN_GENERATION_ID, JoinGroupRequest.UNKNOWN_MEMBER_ID, rebalanceConfig.groupInstanceId);
JoinGroupRequest.UNKNOWN_GENERATION_ID, JoinGroupRequest.UNKNOWN_MEMBER_ID, rebalanceConfig.groupInstanceId);

if (autoCommitEnabled)
this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs);
Expand Down Expand Up @@ -810,7 +810,6 @@ public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(final Set<To
} else {
future = sendOffsetFetchRequest(partitions);
pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future);

}
client.poll(future, timer);

Expand Down Expand Up @@ -1221,8 +1220,8 @@ private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchReq

log.debug("Fetching committed offsets for partitions: {}", partitions);
// construct the request
OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(this.rebalanceConfig.groupId,
new ArrayList<>(partitions));
OffsetFetchRequest.Builder requestBuilder =
new OffsetFetchRequest.Builder(this.rebalanceConfig.groupId, true, new ArrayList<>(partitions));

// send the request with a callback
return client.send(coordinator, requestBuilder)
Expand Down Expand Up @@ -1253,11 +1252,12 @@ public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartitio

Set<String> unauthorizedTopics = null;
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
Set<TopicPartition> unstableTxnOffsetTopicPartitions = new HashSet<>();
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
OffsetFetchResponse.PartitionData data = entry.getValue();
if (data.hasError()) {
Errors error = data.error;
OffsetFetchResponse.PartitionData partitionData = entry.getValue();
if (partitionData.hasError()) {
Errors error = partitionData.error;
log.debug("Failed to fetch offset for partition {}: {}", tp, error.message());

if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
Expand All @@ -1268,15 +1268,17 @@ public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartitio
unauthorizedTopics = new HashSet<>();
}
unauthorizedTopics.add(tp.topic());
} else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
unstableTxnOffsetTopicPartitions.add(tp);
} else {
future.raise(new KafkaException("Unexpected error in fetch offset response for partition " +
tp + ": " + error.message()));
return;
}
} else if (data.offset >= 0) {
} else if (partitionData.offset >= 0) {
// record the position with the offset (-1 indicates no committed offset to fetch);
// if there's no committed offset, record as null
offsets.put(tp, new OffsetAndMetadata(data.offset, data.leaderEpoch, data.metadata));
offsets.put(tp, new OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, partitionData.metadata));
} else {
log.info("Found no committed offset for partition {}", tp);
offsets.put(tp, null);
Expand All @@ -1285,6 +1287,14 @@ public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartitio

if (unauthorizedTopics != null) {
future.raise(new TopicAuthorizationException(unauthorizedTopics));
} else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
// just retry
log.info("The following partitions still have unstable offsets " +
"which are not cleared on the broker side: {}" +
", this could be either" +
"transactional offsets waiting for completion, or " +
"normal offsets waiting for replication after appending to local log", unstableTxnOffsetTopicPartitions);
future.raise(new UnstableOffsetCommitException("There are unstable offsets for the requested topic partitions"));
} else {
future.complete(offsets);
}
Expand Down
Expand Up @@ -548,6 +548,8 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
} else if (!future.isRetriable()) {
throw future.exception();
} else {
metadata.requestUpdate();
}

if (metadata.updateRequested())
Expand Down Expand Up @@ -1356,7 +1358,7 @@ public void clearBufferedDataForUnassignedTopics(Collection<String> assignedTopi
clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
}

// Visibilty for testing
// Visible for testing
protected FetchSessionHandler sessionHandler(int node) {
return sessionHandlers.get(node);
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class BufferPool {
private final Metrics metrics;
private final Time time;
private final Sensor waitTime;
private boolean closed;

/**
* Create a new buffer pool
Expand Down Expand Up @@ -82,6 +84,7 @@ public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, Str
metricGrpName,
"The total time an appender waits for space allocation.");
this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
this.closed = false;
}

/**
Expand All @@ -104,6 +107,12 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx

ByteBuffer buffer = null;
this.lock.lock();

if (this.closed) {
this.lock.unlock();
throw new KafkaException("Producer closed while allocating memory");
}

try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
Expand Down Expand Up @@ -138,6 +147,9 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx
recordWaitTime(timeNs);
}

if (this.closed)
throw new KafkaException("Producer closed while allocating memory");

if (waitingTimeElapsed) {
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
Expand Down Expand Up @@ -316,4 +328,19 @@ public long totalMemory() {
Deque<Condition> waiters() {
return this.waiters;
}

/**
* Closes the buffer pool. Memory will be prevented from being allocated, but may be deallocated. All allocations
* awaiting available memory will be notified to abort.
*/
public void close() {
this.lock.lock();
this.closed = true;
try {
for (Condition waiter : this.waiters)
waiter.signal();
} finally {
this.lock.unlock();
}
}
}
Expand Up @@ -802,6 +802,7 @@ public void unmutePartition(TopicPartition tp, long throttleUntilTimeMs) {
*/
public void close() {
this.closed = true;
this.free.close();
}

/*
Expand Down
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* Exception thrown when there are unstable offsets for the requested topic partitions.
*/
public class UnstableOffsetCommitException extends RetriableException {

private static final long serialVersionUID = 1L;

public UnstableOffsetCommitException(String message) {
super(message);
}
}
Expand Up @@ -33,7 +33,6 @@ public class MetricConfig {
private Sensor.RecordingLevel recordingLevel;

public MetricConfig() {
super();
this.quota = null;
this.samples = 2;
this.eventWindow = Long.MAX_VALUE;
Expand Down

0 comments on commit c28a069

Please sign in to comment.