Skip to content

Commit

Permalink
Merge pull request #728 from apache/trunk
Browse files Browse the repository at this point in the history
CONFLUENT: Sync from apache/kafka trunk to confluentinc/kafka master (13 Jun 2022)

apache/trunk: (7 commits)
KAFKA-13891: reset generation when syncgroup failed with REBALANCE_IN…(apache#12140)
KAFKA-10000: Exactly-once source tasks (apache#11780)
KAFKA-13436: Omitted BrokerTopicMetrics metrics in the documentation (apache#11473)
MINOR: Use Exit.addShutdownHook instead of directly adding hooks to R…(apache#12283)
KAFKA-13846: Adding overloaded metricOrElseCreate method (apache#12121)
KAFKA-13935 Fix static usages of IBP in KRaft mode (apache#12250)
HOTFIX: null check keys of ProducerRecord when computing sizeInBytes (apache#12288)


Conflicts:
None
  • Loading branch information
ableegoldman committed Jun 14, 2022
2 parents 947318c + f421c00 commit 4eef5bd
Show file tree
Hide file tree
Showing 113 changed files with 7,196 additions and 2,113 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Expand Up @@ -438,7 +438,8 @@ subprojects {
"**/ErrorHandlingTaskTest.*", "**/KafkaConfigBackingStoreTest.*", "**/KafkaOffsetBackingStoreTest.*",
"**/KafkaBasedLogTest.*", "**/OffsetStorageWriterTest.*", "**/StandaloneHerderTest.*",
"**/SourceTaskOffsetCommitterTest.*", "**/WorkerConfigTransformerTest.*", "**/WorkerGroupMemberTest.*",
"**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*", "**/WorkerSourceTaskTest.*",
"**/WorkerTest.*", "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*",
"**/WorkerSourceTaskTest.*", "**/AbstractWorkerSourceTaskTest.*", "**/ExactlyOnceWorkerSourceTaskTest.*",
"**/WorkerTaskTest.*",
// streams tests
"**/KafkaStreamsTest.*"
Expand Down
Expand Up @@ -2716,7 +2716,11 @@ private static Map<String, LogDirDescription> logDirDescriptions(DescribeLogDirs
new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey()));
}
}
result.put(logDirResult.logDir(), new LogDirDescription(Errors.forCode(logDirResult.errorCode()).exception(), replicaInfoMap));
result.put(logDirResult.logDir(), new LogDirDescription(
Errors.forCode(logDirResult.errorCode()).exception(),
replicaInfoMap,
logDirResult.totalBytes(),
logDirResult.usableBytes()));
}
return result;
}
Expand Down
Expand Up @@ -20,19 +20,29 @@
import org.apache.kafka.common.errors.ApiException;

import java.util.Map;
import java.util.OptionalLong;

import static java.util.Collections.unmodifiableMap;
import static org.apache.kafka.common.requests.DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES;

/**
* A description of a log directory on a particular broker.
*/
public class LogDirDescription {
private final Map<TopicPartition, ReplicaInfo> replicaInfos;
private final ApiException error;
private final OptionalLong totalBytes;
private final OptionalLong usableBytes;

public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
this(error, replicaInfos, UNKNOWN_VOLUME_BYTES, UNKNOWN_VOLUME_BYTES);
}

public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos, long totalBytes, long usableBytes) {
this.error = error;
this.replicaInfos = replicaInfos;
this.totalBytes = (totalBytes == UNKNOWN_VOLUME_BYTES) ? OptionalLong.empty() : OptionalLong.of(totalBytes);
this.usableBytes = (usableBytes == UNKNOWN_VOLUME_BYTES) ? OptionalLong.empty() : OptionalLong.of(usableBytes);
}

/**
Expand All @@ -54,11 +64,29 @@ public Map<TopicPartition, ReplicaInfo> replicaInfos() {
return unmodifiableMap(replicaInfos);
}

/**
* The total size of the volume this log directory is on or empty if the broker did not return a value.
* For volumes larger than Long.MAX_VALUE, Long.MAX_VALUE is returned.
*/
public OptionalLong totalBytes() {
return totalBytes;
}

/**
* The usable size on the volume this log directory is on or empty if the broker did not return a value.
* For usable sizes larger than Long.MAX_VALUE, Long.MAX_VALUE is returned.
*/
public OptionalLong usableBytes() {
return usableBytes;
}

@Override
public String toString() {
return "LogDirDescription(" +
"replicaInfos=" + replicaInfos +
", error=" + error +
", totalBytes=" + totalBytes +
", usableBytes=" + usableBytes +
')';
}
}
Expand Up @@ -807,6 +807,9 @@ public void handle(SyncGroupResponse syncResponse,
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
"Sent generation was {}", sentGeneration);
// consumer didn't get assignment in this generation, so we need to reset generation
// to avoid joinGroup with out-of-data ownedPartitions in cooperative rebalance
resetStateOnResponseError(ApiKeys.SYNC_GROUP, error, false);
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
// for sync-group request, even if the generation has changed we would not expect the instance id
Expand Down
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class IneligibleReplicaException extends ApiException {
public IneligibleReplicaException(String message) {
super(message);
}
}
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class NewLeaderElectedException extends ApiException {
public NewLeaderElectedException(String message) {
super(message);
}
}
40 changes: 36 additions & 4 deletions clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
Expand Up @@ -509,7 +509,10 @@ public void addMetric(MetricName metricName, MetricConfig config, MetricValuePro
Objects.requireNonNull(metricValueProvider),
config == null ? this.config : config,
time);
registerMetric(m);
KafkaMetric existingMetric = registerMetric(m);
if (existingMetric != null) {
throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
}
}

/**
Expand All @@ -524,6 +527,26 @@ public void addMetric(MetricName metricName, MetricValueProvider<?> metricValueP
addMetric(metricName, null, metricValueProvider);
}

/**
* Create or get an existing metric to monitor an object that implements MetricValueProvider.
* This metric won't be associated with any sensor. This is a way to expose existing values as metrics.
* This method takes care of synchronisation while updating/accessing metrics by concurrent threads.
*
* @param metricName The name of the metric
* @param metricValueProvider The metric value provider associated with this metric
* @return Existing KafkaMetric if already registered or else a newly created one
*/
public KafkaMetric addMetricIfAbsent(MetricName metricName, MetricConfig config, MetricValueProvider<?> metricValueProvider) {
KafkaMetric metric = new KafkaMetric(new Object(),
Objects.requireNonNull(metricName),
Objects.requireNonNull(metricValueProvider),
config == null ? this.config : config,
time);

KafkaMetric existingMetric = registerMetric(metric);
return existingMetric == null ? metric : existingMetric;
}

/**
* Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval`
* will be invoked for each reporter.
Expand Down Expand Up @@ -563,10 +586,18 @@ public synchronized void removeReporter(MetricsReporter reporter) {
}
}

synchronized void registerMetric(KafkaMetric metric) {
/**
* Register a metric if not present or return an already existing metric otherwise.
* When a metric is newly registered, this method returns null
*
* @param metric The KafkaMetric to register
* @return KafkaMetric if the metric already exists, null otherwise
*/
synchronized KafkaMetric registerMetric(KafkaMetric metric) {
MetricName metricName = metric.metricName();
if (this.metrics.containsKey(metricName))
throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
if (this.metrics.containsKey(metricName)) {
return this.metrics.get(metricName);
}
this.metrics.put(metricName, metric);
for (MetricsReporter reporter : reporters) {
try {
Expand All @@ -576,6 +607,7 @@ synchronized void registerMetric(KafkaMetric metric) {
}
}
log.trace("Registered metric named {}", metricName);
return null;
}

/**
Expand Down
Expand Up @@ -297,7 +297,10 @@ public synchronized boolean add(CompoundStat stat, MetricConfig config) {
for (NamedMeasurable m : stat.stats()) {
final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), statConfig, time);
if (!metrics.containsKey(metric.metricName())) {
registry.registerMetric(metric);
KafkaMetric existingMetric = registry.registerMetric(metric);
if (existingMetric != null) {
throw new IllegalArgumentException("A metric named '" + metric.metricName() + "' already exists, can't register another one.");
}
metrics.put(metric.metricName(), metric);
}
}
Expand Down Expand Up @@ -336,7 +339,10 @@ public synchronized boolean add(final MetricName metricName, final MeasurableSta
statConfig,
time
);
registry.registerMetric(metric);
KafkaMetric existingMetric = registry.registerMetric(metric);
if (existingMetric != null) {
throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
}
metrics.put(metric.metricName(), metric);
stats.add(new StatAndConfig(Objects.requireNonNull(stat), metric::config));
return true;
Expand Down
Expand Up @@ -52,6 +52,7 @@
import org.apache.kafka.common.errors.InconsistentTopicIdException;
import org.apache.kafka.common.errors.InconsistentVoterSetException;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.IneligibleReplicaException;
import org.apache.kafka.common.errors.InvalidCommitOffsetSizeException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidFetchSessionEpochException;
Expand All @@ -77,6 +78,7 @@
import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.MemberIdRequiredException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NewLeaderElectedException;
import org.apache.kafka.common.errors.NoReassignmentInProgressException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.NotCoordinatorException;
Expand Down Expand Up @@ -364,7 +366,9 @@ public enum Errors {
INCONSISTENT_TOPIC_ID(103, "The log's topic ID did not match the topic ID in the request", InconsistentTopicIdException::new),
INCONSISTENT_CLUSTER_ID(104, "The clusterId in the request does not match that found on the server", InconsistentClusterIdException::new),
TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new),
FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new);
FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new),
INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible replica.", IneligibleReplicaException::new),
NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Expand Up @@ -45,8 +45,8 @@ public AlterPartitionRequestData data() {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new AlterPartitionResponse(new AlterPartitionResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code()));
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code()));
}

public static AlterPartitionRequest parse(ByteBuffer buffer, short version) {
Expand All @@ -57,8 +57,21 @@ public static class Builder extends AbstractRequest.Builder<AlterPartitionReques

private final AlterPartitionRequestData data;

public Builder(AlterPartitionRequestData data) {
super(ApiKeys.ALTER_PARTITION);
/**
* Constructs a builder for AlterPartitionRequest.
*
* @param data The data to be sent. Note that because the version of the
* request is not known at this time, it is expected that all
* topics have a topic id and a topic name set.
* @param canUseTopicIds True if version 2 and above can be used.
*/
public Builder(AlterPartitionRequestData data, boolean canUseTopicIds) {
super(
ApiKeys.ALTER_PARTITION,
ApiKeys.ALTER_PARTITION.oldestVersion(),
// Version 1 is the maximum version that can be used without topic ids.
canUseTopicIds ? ApiKeys.ALTER_PARTITION.latestVersion() : 1
);
this.data = data;
}

Expand Down
Expand Up @@ -31,6 +31,7 @@
public class DescribeLogDirsResponse extends AbstractResponse {

public static final long INVALID_OFFSET_LAG = -1L;
public static final long UNKNOWN_VOLUME_BYTES = -1L;

private final DescribeLogDirsResponseData data;

Expand Down
Expand Up @@ -18,16 +18,21 @@
"type": "request",
"listeners": ["zkBroker", "controller"],
"name": "AlterPartitionRequest",
"validVersions": "0-1",
// Version 1 adds LeaderRecoveryState field (KIP-704).
//
// Version 2 adds TopicId field to replace TopicName field (KIP-841).
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting broker" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
"about": "The epoch of the requesting broker" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "TopicName", "type": "string", "versions": "0-1", "ignorable": true, "entityType": "topicName",
"about": "The name of the topic to alter ISRs for" },
{ "name": "TopicId", "type": "uuid", "versions": "2+", "ignorable": true,
"about": "The ID of the topic to alter ISRs for" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" },
Expand Down
Expand Up @@ -17,16 +17,22 @@
"apiKey": 56,
"type": "response",
"name": "AlterPartitionResponse",
"validVersions": "0-1",
// Version 1 adds LeaderRecoveryState field (KIP-704).
//
// Version 2 adds TopicId field to replace TopicName field, can return the following new errors:
// INELIGIBLE_REPLICA, NEW_LEADER_ELECTED and UNKNOWN_TOPIC_ID (KIP-841).
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "TopicName", "type": "string", "versions": "0-1", "ignorable": true, "entityType": "topicName",
"about": "The name of the topic" },
{ "name": "TopicId", "type": "uuid", "versions": "2+", "ignorable": true,
"about": "The ID of the topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" },
Expand Down
Expand Up @@ -19,9 +19,10 @@
"listeners": ["zkBroker", "broker"],
"name": "DescribeLogDirsRequest",
// Version 1 is the same as version 0.
"validVersions": "0-3",
"validVersions": "0-4",
// Version 2 is the first flexible version.
// Version 3 is the same as version 2 (new field in response).
// Version 4 is the same as version 2 (new fields in response).
"flexibleVersions": "2+",
"fields": [
{ "name": "Topics", "type": "[]DescribableLogDirTopic", "versions": "0+", "nullableVersions": "0+",
Expand Down
Expand Up @@ -18,9 +18,10 @@
"type": "response",
"name": "DescribeLogDirsResponse",
// Starting in version 1, on quota violation, brokers send out responses before throttling.
"validVersions": "0-3",
"validVersions": "0-4",
// Version 2 is the first flexible version.
// Version 3 adds the top-level ErrorCode field
// Version 4 adds the TotalBytes and UsableBytes fields
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
Expand All @@ -47,7 +48,13 @@
{ "name": "IsFutureKey", "type": "bool", "versions": "0+",
"about": "True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of the replica in the future." }
]}
]}
]},
{ "name": "TotalBytes", "type": "int64", "versions": "4+", "ignorable": true, "default": "-1",
"about": "The total size in bytes of the volume the log directory is in."
},
{ "name": "UsableBytes", "type": "int64", "versions": "4+", "ignorable": true, "default": "-1",
"about": "The usable size in bytes of the volume the log directory is in."
}
]}
]
}

0 comments on commit 4eef5bd

Please sign in to comment.