Skip to content

Commit

Permalink
Squashed commit of the following (during trunk merge):
Browse files Browse the repository at this point in the history
commit 93238ae
Author: Antoine Pourchet <antoine@responsive.dev>
Date:   Thu May 23 13:45:29 2024 -0600

    KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified  (apache#16034)

    This PR uses the new TaskTopicPartition structure to simplify the build
    process for the ApplicationState, which is the input to the new
    TaskAssignor#assign call.

    Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>

commit 4020307
Author: Kuan-Po (Cooper) Tseng <brandboat@gmail.com>
Date:   Fri May 24 02:51:26 2024 +0800

    KAFKA-16795 Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter (apache#16020)

    This commit allows users to apply the scala version Formatters, but users will receive the warning messages about deprecation.

    This compatibility support will be removed from 4.0.0

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit c3018ef
Author: TingIāu "Ting" Kì <51072200+frankvicky@users.noreply.github.com>
Date:   Fri May 24 01:15:56 2024 +0800

    KAFKA-16804: Replace archivesBaseName with archivesName (apache#16016)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Greg Harris <greg.harris@aiven.io>

commit 0ba15ad
Author: Edoardo Comar <ecomar@uk.ibm.com>
Date:   Thu May 23 17:17:56 2024 +0100

    KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… (apache#15910)

    * KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation

    MirrorCheckpointTask reloads the last checkpoint at start,
    OffsetSyncStore stores OffsetSyncs before reading till end.

    If CheckpointTask cannot read checkpoints at startup,
    use previous OffsetSyncStore load logic, with
    warning log message about degraded offset translation.

    Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until
    consumer group fully catches up once because the OffsetSyncStore store
    is populated before reading to log end.

    Co-Authored-By: Adrian Preston <prestona@uk.ibm.com>
    Reviewers: Greg Harris <greg.harris@aiven.io>

commit 5a48984
Author: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
Date:   Thu May 23 17:36:39 2024 +0200

    KAFKA-15649: Handle directory failure timeout (apache#15697)

    A broker that is unable to communicate with the controller will shut down
    after the configurable log.dir.failure.timeout.ms.

    The implementation adds a new event to the Kafka EventQueue. This event
    is deferred by the configured timeout and will execute the shutdown
    if the heartbeat communication containing the failed log dir is still
    pending with the controller.

    Reviewers: Igor Soarez <soarez@apple.com>

commit 8d117a1
Author: Mickael Maison <mimaison@users.noreply.github.com>
Date:   Thu May 23 17:03:24 2024 +0200

    KAFKA-16825: Update netty/jetty/jackson/zstd dependencies (apache#16038)

    Reviewers: Luke Chen <showuon@gmail.com>

commit ab0cc72
Author: Mickael Maison <mimaison@users.noreply.github.com>
Date:   Thu May 23 16:01:45 2024 +0200

    MINOR: Move parseCsvList to server-common (apache#16029)

    Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

commit 14b5c4d
Author: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com>
Date:   Thu May 23 02:27:00 2024 -0400

    KAFKA-16793; Heartbeat API for upgrading ConsumerGroup (apache#15988)

    This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup.

    Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>

commit e692fee
Author: Jeff Kim <kimkb2011@gmail.com>
Date:   Thu May 23 02:24:23 2024 -0400

    MINOR: fix flaky testRecordThreadIdleRatio (apache#15987)

    DelayEventAccumulator should return immediately if there are no events in the queue. Also removed some unused fields inside EventProcessorThread.

    Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>

commit bef83ce
Author: Nick Telford <nick.telford@gmail.com>
Date:   Thu May 23 05:34:31 2024 +0100

    KAFKA-15541: Add iterator-duration metrics (apache#16028)

    Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw).

    This new `StateStore` metric tracks the average and maximum amount of
    time between creating and closing Iterators.

    Iterators with very high durations can indicate to users performance
    problems that should be addressed.

    If a store reports no data for these metrics, despite the user opening
    Iterators on the store, it suggests those iterators are not being
    closed, and have therefore leaked.

    Reviewers: Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
apourchet committed May 23, 2024
1 parent ff9d526 commit 03db088
Show file tree
Hide file tree
Showing 58 changed files with 2,090 additions and 404 deletions.
215 changes: 162 additions & 53 deletions build.gradle

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.HashMap;
import java.nio.ByteBuffer;
import java.util.Objects;

/** Checkpoint records emitted from MirrorCheckpointConnector. Encodes remote consumer group state. */
public class Checkpoint {
Expand Down Expand Up @@ -180,5 +181,18 @@ byte[] recordKey() {
byte[] recordValue() {
return serializeValue(VERSION).array();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Checkpoint that = (Checkpoint) o;
return upstreamOffset == that.upstreamOffset && downstreamOffset == that.downstreamOffset && Objects.equals(consumerGroupId, that.consumerGroupId) && Objects.equals(topicPartition, that.topicPartition) && Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(consumerGroupId, topicPartition, upstreamOffset, downstreamOffset, metadata);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE;

/**
* Reads once the Kafka log for checkpoints and populates a map of
* checkpoints per consumer group.
*
* The Kafka log is closed after the initial load and only the in memory map is
* used after start.
*/
public class CheckpointStore implements AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(CheckpointStore.class);

private final MirrorCheckpointTaskConfig config;
private final Set<String> consumerGroups;

private TopicAdmin cpAdmin = null;
private KafkaBasedLog<byte[], byte[]> backingStore = null;
// accessible for testing
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup;

private volatile boolean loadSuccess = false;
private volatile boolean isInitialized = false;

public CheckpointStore(MirrorCheckpointTaskConfig config, Set<String> consumerGroups) {
this.config = config;
this.consumerGroups = new HashSet<>(consumerGroups);
}

// constructor for testing only
CheckpointStore(Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup) {
this.config = null; //ignored by tests
this.consumerGroups = null; //ignored by tests
this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
isInitialized = true;
loadSuccess = true;
}

// potentially long running
public boolean start() {
checkpointsPerConsumerGroup = readCheckpoints();
isInitialized = true;
if (log.isTraceEnabled()) {
log.trace("CheckpointStore started, load success={}, map={}", loadSuccess, checkpointsPerConsumerGroup);
} else {
log.debug("CheckpointStore started, load success={}, map.size={}", loadSuccess, checkpointsPerConsumerGroup.size());
}
return loadSuccess;
}

public boolean isInitialized() {
return isInitialized;
}

public void update(String group, Map<TopicPartition, Checkpoint> newCheckpoints) {
Map<TopicPartition, Checkpoint> oldCheckpoints = checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>());
oldCheckpoints.putAll(newCheckpoints);
}

public Map<TopicPartition, Checkpoint> get(String group) {
Map<TopicPartition, Checkpoint> result = checkpointsPerConsumerGroup.get(group);
return result == null ? null : Collections.unmodifiableMap(result);
}

public Map<String, Map<TopicPartition, OffsetAndMetadata>> computeConvertedUpstreamOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>();

for (Map.Entry<String, Map<TopicPartition, Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
String consumerId = entry.getKey();
Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = new HashMap<>();
for (Checkpoint checkpoint : entry.getValue().values()) {
convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
}
result.put(consumerId, convertedUpstreamOffset);
}
return result;
}

@Override
public void close() {
releaseResources();
}

private void releaseResources() {
Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "backing store for previous Checkpoints");
Utils.closeQuietly(cpAdmin, "admin client for previous Checkpoints");
cpAdmin = null;
backingStore = null;
}

// read the checkpoints topic to initialize the checkpointsPerConsumerGroup state
// the callback may only handle errors thrown by consumer.poll in KafkaBasedLog
// e.g. unauthorized to read from topic (non-retriable)
// if any are encountered, treat the loading of Checkpoints as failed.
private Map<String, Map<TopicPartition, Checkpoint>> readCheckpoints() {
Map<String, Map<TopicPartition, Checkpoint>> checkpoints = new HashMap<>();
Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = (error, cpRecord) -> {
if (error != null) {
// if there is no authorization to READ from the topic, we must throw an error
// to stop the KafkaBasedLog forever looping attempting to read to end
checkpoints.clear();
if (error instanceof RuntimeException) {
throw (RuntimeException) error;
} else {
throw new RuntimeException(error);
}
} else {
try {
Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);
if (consumerGroups.contains(cp.consumerGroupId())) {
Map<TopicPartition, Checkpoint> cps = checkpoints.computeIfAbsent(cp.consumerGroupId(), ignored1 -> new HashMap<>());
cps.put(cp.topicPartition(), cp);
}
} catch (SchemaException ex) {
log.warn("Ignored invalid checkpoint record at offset {}", cpRecord.offset(), ex);
}
}
};

try {
long startTime = System.currentTimeMillis();
readCheckpointsImpl(config, consumedCallback);
log.debug("starting+stopping KafkaBasedLog took {}ms", System.currentTimeMillis() - startTime);
loadSuccess = true;
} catch (Exception error) {
loadSuccess = false;
if (error instanceof AuthorizationException) {
log.warn("Not authorized to access checkpoints topic {} - " +
"this may degrade offset translation as only checkpoints " +
"for offsets which were mirrored after the task started will be emitted",
config.checkpointsTopic(), error);
} else {
log.info("Exception encountered loading checkpoints topic {} - " +
"this may degrade offset translation as only checkpoints " +
"for offsets which were mirrored after the task started will be emitted",
config.checkpointsTopic(), error);
}
}
return checkpoints;
}

// accessible for testing
void readCheckpointsImpl(MirrorCheckpointTaskConfig config, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback) {
try {
cpAdmin = new TopicAdmin(
config.targetAdminConfig("checkpoint-target-admin"),
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")));

backingStore = KafkaBasedLog.withExistingClients(
config.checkpointsTopic(),
MirrorUtils.newConsumer(config.targetConsumerConfig(CHECKPOINTS_TARGET_CONSUMER_ROLE)),
null,
cpAdmin,
consumedCallback,
Time.SYSTEM,
ignored -> {
},
topicPartition -> topicPartition.partition() == 0);

backingStore.start(true);
backingStore.stop();
} finally {
releaseResources();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
public static final Class<?> GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class;
public static final String OFFSET_SYNCS_SOURCE_CONSUMER_ROLE = "offset-syncs-source-consumer";
public static final String OFFSET_SYNCS_TARGET_CONSUMER_ROLE = "offset-syncs-target-consumer";
public static final String CHECKPOINTS_TARGET_CONSUMER_ROLE = "checkpoints-target-consumer";
public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = "offset-syncs-source-admin";
public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = "offset-syncs-target-admin";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,25 @@ public class MirrorCheckpointTask extends SourceTask {
private MirrorCheckpointMetrics metrics;
private Scheduler scheduler;
private Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset;
private Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup;
private CheckpointStore checkpointStore;

public MirrorCheckpointTask() {}

// for testing
MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias,
ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore,
ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore, Set<String> consumerGroups,
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset,
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup) {
CheckpointStore checkpointStore) {
this.sourceClusterAlias = sourceClusterAlias;
this.targetClusterAlias = targetClusterAlias;
this.replicationPolicy = replicationPolicy;
this.offsetSyncStore = offsetSyncStore;
this.consumerGroups = consumerGroups;
this.idleConsumerGroupsOffset = idleConsumerGroupsOffset;
this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
this.checkpointStore = checkpointStore;
this.topicFilter = topic -> true;
this.interval = Duration.ofNanos(1);
this.pollTimeout = Duration.ofNanos(1);
}

@Override
Expand All @@ -103,15 +107,18 @@ public void start(Map<String, String> props) {
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
metrics = config.metrics();
idleConsumerGroupsOffset = new HashMap<>();
checkpointsPerConsumerGroup = new HashMap<>();
checkpointStore = new CheckpointStore(config, consumerGroups);
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
scheduler.execute(() -> {
offsetSyncStore.start();
// loading the stores are potentially long running operations, so they run asynchronously
// to avoid blocking task::start (until a task has completed starting it cannot be stopped)
boolean checkpointsReadOk = checkpointStore.start();
offsetSyncStore.start(!checkpointsReadOk);
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
"refreshing idle consumers group offsets at target cluster");
scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
"sync idle consumer group offset from source to target");
}, "starting offset sync store");
}, "starting checkpoint and offset sync stores");
log.info("{} checkpointing {} consumer groups {}->{}: {}.", Thread.currentThread().getName(),
consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups);
}
Expand All @@ -126,6 +133,7 @@ public void stop() {
long start = System.currentTimeMillis();
stopping = true;
Utils.closeQuietly(topicFilter, "topic filter");
Utils.closeQuietly(checkpointStore, "checkpoints store");
Utils.closeQuietly(offsetSyncStore, "offset sync store");
Utils.closeQuietly(sourceAdminClient, "source admin client");
Utils.closeQuietly(targetAdminClient, "target admin client");
Expand All @@ -146,8 +154,8 @@ public List<SourceRecord> poll() throws InterruptedException {
while (!stopping && System.currentTimeMillis() < deadline) {
Thread.sleep(pollTimeout.toMillis());
}
if (stopping) {
// we are stopping, return early.
if (stopping || !checkpointStore.isInitialized()) {
// we are stopping, or not fully initialized, return early.
return null;
}
List<SourceRecord> records = new ArrayList<>();
Expand All @@ -166,14 +174,13 @@ public List<SourceRecord> poll() throws InterruptedException {
}
}


private List<SourceRecord> sourceRecordsForGroup(String group) throws InterruptedException {
// visible for testing
List<SourceRecord> sourceRecordsForGroup(String group) throws InterruptedException {
try {
long timestamp = System.currentTimeMillis();
Map<TopicPartition, OffsetAndMetadata> upstreamGroupOffsets = listConsumerGroupOffsets(group);
Map<TopicPartition, Checkpoint> newCheckpoints = checkpointsForGroup(upstreamGroupOffsets, group);
Map<TopicPartition, Checkpoint> oldCheckpoints = checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>());
oldCheckpoints.putAll(newCheckpoints);
checkpointStore.update(group, newCheckpoints);
return newCheckpoints.values().stream()
.map(x -> checkpointRecord(x, timestamp))
.collect(Collectors.toList());
Expand All @@ -195,7 +202,7 @@ Map<TopicPartition, Checkpoint> checkpointsForGroup(Map<TopicPartition, OffsetAn
}

private boolean checkpointIsMoreRecent(Checkpoint checkpoint) {
Map<TopicPartition, Checkpoint> checkpoints = checkpointsPerConsumerGroup.get(checkpoint.consumerGroupId());
Map<TopicPartition, Checkpoint> checkpoints = checkpointStore.get(checkpoint.consumerGroupId());
if (checkpoints == null) {
log.trace("Emitting {} (first for this group)", checkpoint);
return true;
Expand Down Expand Up @@ -314,7 +321,7 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() throws Exe
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = new HashMap<>();

// first, sync offsets for the idle consumers at target
for (Entry<String, Map<TopicPartition, OffsetAndMetadata>> group : getConvertedUpstreamOffset().entrySet()) {
for (Entry<String, Map<TopicPartition, OffsetAndMetadata>> group : checkpointStore.computeConvertedUpstreamOffset().entrySet()) {
String consumerGroupId = group.getKey();
// for each idle consumer at target, read the checkpoints (converted upstream offset)
// from the pre-populated map
Expand Down Expand Up @@ -391,18 +398,4 @@ void syncGroupOffset(String consumerGroupId, Map<TopicPartition, OffsetAndMetada
);
}
}

Map<String, Map<TopicPartition, OffsetAndMetadata>> getConvertedUpstreamOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>();

for (Entry<String, Map<TopicPartition, Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
String consumerId = entry.getKey();
Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = new HashMap<>();
for (Checkpoint checkpoint : entry.getValue().values()) {
convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
}
result.put(consumerId, convertedUpstreamOffset);
}
return result;
}
}

0 comments on commit 03db088

Please sign in to comment.