From 0ba15ad4d1bd7318d626430035a1666eb16f844d Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Thu, 23 May 2024 17:17:56 +0100 Subject: [PATCH] =?UTF-8?q?KAFKA-15905=20Restarts=20of=20MirrorCheckpointT?= =?UTF-8?q?ask=20should=20not=20permanently=20i=E2=80=A6=20(#15910)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore stores OffsetSyncs before reading till end. If CheckpointTask cannot read checkpoints at startup, use previous OffsetSyncStore load logic, with warning log message about degraded offset translation. Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once because the OffsetSyncStore store is populated before reading to log end. Co-Authored-By: Adrian Preston Reviewers: Greg Harris --- .../kafka/connect/mirror/Checkpoint.java | 14 ++ .../kafka/connect/mirror/CheckpointStore.java | 203 ++++++++++++++++++ .../mirror/MirrorCheckpointConfig.java | 1 + .../connect/mirror/MirrorCheckpointTask.java | 51 ++--- .../kafka/connect/mirror/OffsetSyncStore.java | 22 +- .../connect/mirror/CheckpointStoreTest.java | 97 +++++++++ .../mirror/MirrorCheckpointTaskTest.java | 125 +++++++++-- .../connect/mirror/OffsetSyncStoreTest.java | 108 +++++++--- .../kafka/connect/util/KafkaBasedLog.java | 12 +- 9 files changed, 562 insertions(+), 71 deletions(-) create mode 100644 connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointStore.java create mode 100644 connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointStoreTest.java diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java index 8f186400dd29..353d2eedb959 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.HashMap; import java.nio.ByteBuffer; +import java.util.Objects; /** Checkpoint records emitted from MirrorCheckpointConnector. Encodes remote consumer group state. */ public class Checkpoint { @@ -180,5 +181,18 @@ byte[] recordKey() { byte[] recordValue() { return serializeValue(VERSION).array(); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Checkpoint that = (Checkpoint) o; + return upstreamOffset == that.upstreamOffset && downstreamOffset == that.downstreamOffset && Objects.equals(consumerGroupId, that.consumerGroupId) && Objects.equals(topicPartition, that.topicPartition) && Objects.equals(metadata, that.metadata); + } + + @Override + public int hashCode() { + return Objects.hash(consumerGroupId, topicPartition, upstreamOffset, downstreamOffset, metadata); + } } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointStore.java new file mode 100644 index 000000000000..cbe76efecb38 --- /dev/null +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointStore.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE; + +/** + * Reads once the Kafka log for checkpoints and populates a map of + * checkpoints per consumer group. + * + * The Kafka log is closed after the initial load and only the in memory map is + * used after start. + */ +public class CheckpointStore implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(CheckpointStore.class); + + private final MirrorCheckpointTaskConfig config; + private final Set consumerGroups; + + private TopicAdmin cpAdmin = null; + private KafkaBasedLog backingStore = null; + // accessible for testing + Map> checkpointsPerConsumerGroup; + + private volatile boolean loadSuccess = false; + private volatile boolean isInitialized = false; + + public CheckpointStore(MirrorCheckpointTaskConfig config, Set consumerGroups) { + this.config = config; + this.consumerGroups = new HashSet<>(consumerGroups); + } + + // constructor for testing only + CheckpointStore(Map> checkpointsPerConsumerGroup) { + this.config = null; //ignored by tests + this.consumerGroups = null; //ignored by tests + this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup; + isInitialized = true; + loadSuccess = true; + } + + // potentially long running + public boolean start() { + checkpointsPerConsumerGroup = readCheckpoints(); + isInitialized = true; + if (log.isTraceEnabled()) { + log.trace("CheckpointStore started, load success={}, map={}", loadSuccess, checkpointsPerConsumerGroup); + } else { + log.debug("CheckpointStore started, load success={}, map.size={}", loadSuccess, checkpointsPerConsumerGroup.size()); + } + return loadSuccess; + } + + public boolean isInitialized() { + return isInitialized; + } + + public void update(String group, Map newCheckpoints) { + Map oldCheckpoints = checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>()); + oldCheckpoints.putAll(newCheckpoints); + } + + public Map get(String group) { + Map result = checkpointsPerConsumerGroup.get(group); + return result == null ? null : Collections.unmodifiableMap(result); + } + + public Map> computeConvertedUpstreamOffset() { + Map> result = new HashMap<>(); + + for (Map.Entry> entry : checkpointsPerConsumerGroup.entrySet()) { + String consumerId = entry.getKey(); + Map convertedUpstreamOffset = new HashMap<>(); + for (Checkpoint checkpoint : entry.getValue().values()) { + convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata()); + } + result.put(consumerId, convertedUpstreamOffset); + } + return result; + } + + @Override + public void close() { + releaseResources(); + } + + private void releaseResources() { + Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "backing store for previous Checkpoints"); + Utils.closeQuietly(cpAdmin, "admin client for previous Checkpoints"); + cpAdmin = null; + backingStore = null; + } + + // read the checkpoints topic to initialize the checkpointsPerConsumerGroup state + // the callback may only handle errors thrown by consumer.poll in KafkaBasedLog + // e.g. unauthorized to read from topic (non-retriable) + // if any are encountered, treat the loading of Checkpoints as failed. + private Map> readCheckpoints() { + Map> checkpoints = new HashMap<>(); + Callback> consumedCallback = (error, cpRecord) -> { + if (error != null) { + // if there is no authorization to READ from the topic, we must throw an error + // to stop the KafkaBasedLog forever looping attempting to read to end + checkpoints.clear(); + if (error instanceof RuntimeException) { + throw (RuntimeException) error; + } else { + throw new RuntimeException(error); + } + } else { + try { + Checkpoint cp = Checkpoint.deserializeRecord(cpRecord); + if (consumerGroups.contains(cp.consumerGroupId())) { + Map cps = checkpoints.computeIfAbsent(cp.consumerGroupId(), ignored1 -> new HashMap<>()); + cps.put(cp.topicPartition(), cp); + } + } catch (SchemaException ex) { + log.warn("Ignored invalid checkpoint record at offset {}", cpRecord.offset(), ex); + } + } + }; + + try { + long startTime = System.currentTimeMillis(); + readCheckpointsImpl(config, consumedCallback); + log.debug("starting+stopping KafkaBasedLog took {}ms", System.currentTimeMillis() - startTime); + loadSuccess = true; + } catch (Exception error) { + loadSuccess = false; + if (error instanceof AuthorizationException) { + log.warn("Not authorized to access checkpoints topic {} - " + + "this may degrade offset translation as only checkpoints " + + "for offsets which were mirrored after the task started will be emitted", + config.checkpointsTopic(), error); + } else { + log.info("Exception encountered loading checkpoints topic {} - " + + "this may degrade offset translation as only checkpoints " + + "for offsets which were mirrored after the task started will be emitted", + config.checkpointsTopic(), error); + } + } + return checkpoints; + } + + // accessible for testing + void readCheckpointsImpl(MirrorCheckpointTaskConfig config, Callback> consumedCallback) { + try { + cpAdmin = new TopicAdmin( + config.targetAdminConfig("checkpoint-target-admin"), + config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"))); + + backingStore = KafkaBasedLog.withExistingClients( + config.checkpointsTopic(), + MirrorUtils.newConsumer(config.targetConsumerConfig(CHECKPOINTS_TARGET_CONSUMER_ROLE)), + null, + cpAdmin, + consumedCallback, + Time.SYSTEM, + ignored -> { + }, + topicPartition -> topicPartition.partition() == 0); + + backingStore.start(true); + backingStore.stop(); + } finally { + releaseResources(); + } + } +} diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java index e37cee4a79b8..8be52a9c9be9 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java @@ -75,6 +75,7 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig { public static final Class GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class; public static final String OFFSET_SYNCS_SOURCE_CONSUMER_ROLE = "offset-syncs-source-consumer"; public static final String OFFSET_SYNCS_TARGET_CONSUMER_ROLE = "offset-syncs-target-consumer"; + public static final String CHECKPOINTS_TARGET_CONSUMER_ROLE = "checkpoints-target-consumer"; public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = "offset-syncs-source-admin"; public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = "offset-syncs-target-admin"; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index 96c287add5f1..7f446efea5df 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -69,21 +69,25 @@ public class MirrorCheckpointTask extends SourceTask { private MirrorCheckpointMetrics metrics; private Scheduler scheduler; private Map> idleConsumerGroupsOffset; - private Map> checkpointsPerConsumerGroup; + private CheckpointStore checkpointStore; + public MirrorCheckpointTask() {} // for testing MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias, - ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore, + ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore, Set consumerGroups, Map> idleConsumerGroupsOffset, - Map> checkpointsPerConsumerGroup) { + CheckpointStore checkpointStore) { this.sourceClusterAlias = sourceClusterAlias; this.targetClusterAlias = targetClusterAlias; this.replicationPolicy = replicationPolicy; this.offsetSyncStore = offsetSyncStore; + this.consumerGroups = consumerGroups; this.idleConsumerGroupsOffset = idleConsumerGroupsOffset; - this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup; + this.checkpointStore = checkpointStore; this.topicFilter = topic -> true; + this.interval = Duration.ofNanos(1); + this.pollTimeout = Duration.ofNanos(1); } @Override @@ -103,15 +107,18 @@ public void start(Map props) { targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")); metrics = config.metrics(); idleConsumerGroupsOffset = new HashMap<>(); - checkpointsPerConsumerGroup = new HashMap<>(); + checkpointStore = new CheckpointStore(config, consumerGroups); scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout()); scheduler.execute(() -> { - offsetSyncStore.start(); + // loading the stores are potentially long running operations, so they run asynchronously + // to avoid blocking task::start (until a task has completed starting it cannot be stopped) + boolean checkpointsReadOk = checkpointStore.start(); + offsetSyncStore.start(!checkpointsReadOk); scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(), "refreshing idle consumers group offsets at target cluster"); scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(), "sync idle consumer group offset from source to target"); - }, "starting offset sync store"); + }, "starting checkpoint and offset sync stores"); log.info("{} checkpointing {} consumer groups {}->{}: {}.", Thread.currentThread().getName(), consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } @@ -126,6 +133,7 @@ public void stop() { long start = System.currentTimeMillis(); stopping = true; Utils.closeQuietly(topicFilter, "topic filter"); + Utils.closeQuietly(checkpointStore, "checkpoints store"); Utils.closeQuietly(offsetSyncStore, "offset sync store"); Utils.closeQuietly(sourceAdminClient, "source admin client"); Utils.closeQuietly(targetAdminClient, "target admin client"); @@ -146,8 +154,8 @@ public List poll() throws InterruptedException { while (!stopping && System.currentTimeMillis() < deadline) { Thread.sleep(pollTimeout.toMillis()); } - if (stopping) { - // we are stopping, return early. + if (stopping || !checkpointStore.isInitialized()) { + // we are stopping, or not fully initialized, return early. return null; } List records = new ArrayList<>(); @@ -166,14 +174,13 @@ public List poll() throws InterruptedException { } } - - private List sourceRecordsForGroup(String group) throws InterruptedException { + // visible for testing + List sourceRecordsForGroup(String group) throws InterruptedException { try { long timestamp = System.currentTimeMillis(); Map upstreamGroupOffsets = listConsumerGroupOffsets(group); Map newCheckpoints = checkpointsForGroup(upstreamGroupOffsets, group); - Map oldCheckpoints = checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>()); - oldCheckpoints.putAll(newCheckpoints); + checkpointStore.update(group, newCheckpoints); return newCheckpoints.values().stream() .map(x -> checkpointRecord(x, timestamp)) .collect(Collectors.toList()); @@ -195,7 +202,7 @@ Map checkpointsForGroup(Map checkpoints = checkpointsPerConsumerGroup.get(checkpoint.consumerGroupId()); + Map checkpoints = checkpointStore.get(checkpoint.consumerGroupId()); if (checkpoints == null) { log.trace("Emitting {} (first for this group)", checkpoint); return true; @@ -314,7 +321,7 @@ Map> syncGroupOffset() throws Exe Map> offsetToSyncAll = new HashMap<>(); // first, sync offsets for the idle consumers at target - for (Entry> group : getConvertedUpstreamOffset().entrySet()) { + for (Entry> group : checkpointStore.computeConvertedUpstreamOffset().entrySet()) { String consumerGroupId = group.getKey(); // for each idle consumer at target, read the checkpoints (converted upstream offset) // from the pre-populated map @@ -391,18 +398,4 @@ void syncGroupOffset(String consumerGroupId, Map> getConvertedUpstreamOffset() { - Map> result = new HashMap<>(); - - for (Entry> entry : checkpointsPerConsumerGroup.entrySet()) { - String consumerId = entry.getKey(); - Map convertedUpstreamOffset = new HashMap<>(); - for (Checkpoint checkpoint : entry.getValue().values()) { - convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata()); - } - result.put(consumerId, convertedUpstreamOffset); - } - return result; - } } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java index eca5cc68f997..16038044ddd2 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java @@ -54,7 +54,7 @@ * started after the position of the consumer group, or if relevant offset syncs for the topic were potentially used as * for translation in an earlier generation of the sync store. */ -class OffsetSyncStore implements AutoCloseable { +public class OffsetSyncStore implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(OffsetSyncStore.class); // Store one offset sync for each bit of the topic offset. @@ -63,8 +63,10 @@ class OffsetSyncStore implements AutoCloseable { private final KafkaBasedLog backingStore; private final Map offsetSyncs = new ConcurrentHashMap<>(); private final TopicAdmin admin; + protected volatile boolean initializationMustReadToEnd = true; protected volatile boolean readToEnd = false; + // package access to avoid Java 21 "this-escape" warning OffsetSyncStore(MirrorCheckpointConfig config) { Consumer consumer = null; TopicAdmin admin = null; @@ -97,6 +99,7 @@ private KafkaBasedLog createBackingStore(MirrorCheckpointConfig ); } + // for testing OffsetSyncStore() { this.admin = null; this.backingStore = null; @@ -105,12 +108,19 @@ private KafkaBasedLog createBackingStore(MirrorCheckpointConfig /** * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage. */ - public void start() { - backingStore.start(); + public void start(boolean initializationMustReadToEnd) { + this.initializationMustReadToEnd = initializationMustReadToEnd; + log.debug("OffsetSyncStore starting - must read to OffsetSync end = {}", initializationMustReadToEnd); + backingStoreStart(); readToEnd = true; } - OptionalLong translateDownstream(String group, TopicPartition sourceTopicPartition, long upstreamOffset) { + // overridable for testing + void backingStoreStart() { + backingStore.start(false); + } + + public OptionalLong translateDownstream(String group, TopicPartition sourceTopicPartition, long upstreamOffset) { if (!readToEnd) { // If we have not read to the end of the syncs topic at least once, decline to translate any offsets. // This prevents emitting stale offsets while initially reading the offset syncs topic. @@ -214,7 +224,9 @@ private void updateSyncArray(OffsetSync[] syncs, OffsetSync[] original, OffsetSy // While reading to the end of the topic, ensure that our earliest sync is later than // any earlier sync that could have been used for translation, to preserve monotonicity // If the upstream offset rewinds, all previous offsets are invalid, so overwrite them all. - if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { + boolean onlyLoadLastOffset = !readToEnd && initializationMustReadToEnd; + boolean upstreamRewind = upstreamOffset < syncs[0].upstreamOffset(); + if (onlyLoadLastOffset || upstreamRewind) { clearSyncArray(syncs, offsetSync); return; } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointStoreTest.java new file mode 100644 index 000000000000..b7b3904899f1 --- /dev/null +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointStoreTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.connect.util.Callback; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CheckpointStoreTest { + + @Test + public void testReadCheckpointsTopic() { + Set consumerGroups = new HashSet<>(); + consumerGroups.add("group1"); + + MirrorCheckpointTaskConfig config = mock(MirrorCheckpointTaskConfig.class); + when(config.checkpointsTopic()).thenReturn("checkpoint.topic"); + + try (CheckpointStore store = new CheckpointStore(config, consumerGroups) { + @Override + void readCheckpointsImpl(MirrorCheckpointTaskConfig config, Callback> consumedCallback) { + consumedCallback.onCompletion(null, newCheckpointRecord("group1", "t1", 0, 0, 0)); + // this record must be ignored as not part of consumerGroups for task + consumedCallback.onCompletion(null, newCheckpointRecord("group2", "t1", 0, 0, 0)); + // this record must be ignored as malformed + consumedCallback.onCompletion(null, + new ConsumerRecord<>("checkpoint.topic", 0, 0L, new byte[0], new byte[0])); + consumedCallback.onCompletion(null, newCheckpointRecord("group1", "t1", 0, 1, 1)); + } + }) { + assertFalse(store.isInitialized()); + + assertTrue(store.start(), "expected start to return success"); + assertTrue(store.isInitialized()); + + Map> expected = new HashMap<>(); + expected.put("group1", Collections.singletonMap(new TopicPartition("t1", 0), + new Checkpoint("group1", new TopicPartition("t1", 0), 1, 1, ""))); + assertEquals(expected, store.checkpointsPerConsumerGroup); + } + } + + @Test + public void testReadCheckpointsTopicError() { + Set consumerGroups = new HashSet<>(); + consumerGroups.add("group1"); + + MirrorCheckpointTaskConfig config = mock(MirrorCheckpointTaskConfig.class); + when(config.checkpointsTopic()).thenReturn("checkpoint.topic"); + + try (CheckpointStore store = new CheckpointStore(config, consumerGroups) { + @Override + void readCheckpointsImpl(MirrorCheckpointTaskConfig config, Callback> consumedCallback) { + consumedCallback.onCompletion(null, newCheckpointRecord("group1", "topic", 1, 0, 0)); + consumedCallback.onCompletion(new TopicAuthorizationException("test"), null); + } + }) { + + assertFalse(store.start(), "expected start to return failure"); + assertTrue(store.isInitialized()); + assertTrue(store.checkpointsPerConsumerGroup.isEmpty()); + } + } + + ConsumerRecord newCheckpointRecord(String gid, String topic, int partition, long upo, long dwo) { + Checkpoint cp = new Checkpoint(gid, new TopicPartition(topic, partition), upo, dwo, ""); + return new ConsumerRecord<>("checkpoint.topic", 0, 0L, cp.recordKey(), cp.recordValue()); + } +} diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java index c7aec3e54788..0afc4f74f2f9 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.mirror; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Collections; import java.util.Optional; @@ -32,14 +33,19 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class MirrorCheckpointTaskTest { @Test public void testDownstreamTopicRenaming() { MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", - new DefaultReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap()); + new DefaultReplicationPolicy(), null, Collections.emptySet(), Collections.emptyMap(), + new CheckpointStore(Collections.emptyMap())); assertEquals(new TopicPartition("source1.topic3", 4), mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)), "Renaming source1.topic3 failed"); @@ -58,9 +64,10 @@ public void testCheckpoint() { long t2UpstreamOffset = 7L; long t2DownstreamOffset = 8L; OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore(); - offsetSyncStore.start(); + offsetSyncStore.start(true); MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", - new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap()); + new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptySet(), + Collections.emptyMap(), new CheckpointStore(Collections.emptyMap())); offsetSyncStore.sync(new TopicPartition("topic1", 2), t1UpstreamOffset, t1DownstreamOffset); offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), t2UpstreamOffset, t2DownstreamOffset); Optional optionalCheckpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2), @@ -160,7 +167,8 @@ public void testSyncOffset() throws ExecutionException, InterruptedException { checkpointsPerConsumerGroup.put(consumer2, checkpointMapC2); MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", - new DefaultReplicationPolicy(), null, idleConsumerGroupsOffset, checkpointsPerConsumerGroup); + new DefaultReplicationPolicy(), null, Collections.emptySet(), idleConsumerGroupsOffset, + new CheckpointStore(checkpointsPerConsumerGroup)); Map> output = mirrorCheckpointTask.syncGroupOffset(); @@ -190,7 +198,8 @@ public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() throws Execu checkpointsPerConsumerGroup.put(consumer, checkpointMap); MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source", "target", - new DefaultReplicationPolicy(), null, idleConsumerGroupsOffset, checkpointsPerConsumerGroup); + new DefaultReplicationPolicy(), null, Collections.emptySet(), idleConsumerGroupsOffset, + new CheckpointStore(checkpointsPerConsumerGroup)); Map> output = mirrorCheckpointTask.syncGroupOffset(); @@ -200,9 +209,10 @@ public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() throws Execu @Test public void testNoCheckpointForTopicWithoutOffsetSyncs() { OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore(); - offsetSyncStore.start(); + offsetSyncStore.start(true); MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", - new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap()); + new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptySet(), Collections.emptyMap(), + new CheckpointStore(Collections.emptyMap())); offsetSyncStore.sync(new TopicPartition("topic1", 0), 3L, 4L); Optional checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 1), @@ -216,9 +226,10 @@ public void testNoCheckpointForTopicWithoutOffsetSyncs() { @Test public void testNoCheckpointForTopicWithNullOffsetAndMetadata() { OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore(); - offsetSyncStore.start(); + offsetSyncStore.start(true); MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", - new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap()); + new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptySet(), Collections.emptyMap(), + new CheckpointStore(Collections.emptyMap())); offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L); Optional checkpoint = mirrorCheckpointTask.checkpoint("g1", new TopicPartition("topic1", 0), null); assertFalse(checkpoint.isPresent()); @@ -227,10 +238,11 @@ public void testNoCheckpointForTopicWithNullOffsetAndMetadata() { @Test public void testCheckpointRecordsMonotonicIfStoreRewinds() { OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore(); - offsetSyncStore.start(); + offsetSyncStore.start(true); Map> checkpointsPerConsumerGroup = new HashMap<>(); MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", - new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), checkpointsPerConsumerGroup); + new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptySet(), Collections.emptyMap(), + new CheckpointStore(checkpointsPerConsumerGroup)); TopicPartition tp = new TopicPartition("topic1", 0); TopicPartition targetTP = new TopicPartition("source1.topic1", 0); @@ -252,7 +264,7 @@ public void testCheckpointRecordsMonotonicIfStoreRewinds() { offsetSyncStore.sync(tp, upstream++, downstream++); offsetSyncStore.sync(tp, upstream++, downstream++); offsetSyncStore.sync(tp, upstream++, downstream++); - offsetSyncStore.sync(tp, upstream++, downstream++); + offsetSyncStore.sync(tp, upstream, downstream); // The OffsetSyncStore will change its translation of the same offset assertNotEquals(OptionalLong.of(expectedDownstreamOffset), offsetSyncStore.translateDownstream("g1", tp, consumerGroupOffset)); // But the task will filter this out and not emit a checkpoint @@ -271,4 +283,93 @@ private Map assertCheckpointForTopic( assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + (truth ? "" : " not") + " emit offset sync"); return checkpoints; } + + @Test + public void testCheckpointsTaskRestartUsesExistingCheckpoints() { + TopicPartition t1p0 = new TopicPartition("t1", 0); + TopicPartition sourceT1p0 = new TopicPartition("source1.t1", 0); + OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore() { + @Override + void backingStoreStart() { + // OffsetSyncStore contains entries for: 100->100, 200->200, 300->300 + for (int i = 100; i <= 300; i += 100) { + sync(t1p0, i, i); + } + } + }; + offsetSyncStore.start(false); + + MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", + new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptySet(), Collections.emptyMap(), + new CheckpointStore(Collections.emptyMap())); + + // Generate a checkpoint for upstream offset 250, and assert it maps to downstream 201 + // (as nearest mapping in OffsetSyncStore is 200->200) + Map upstreamGroupOffsets = new HashMap<>(); + upstreamGroupOffsets.put(t1p0, new OffsetAndMetadata(250)); + Map checkpoints = mirrorCheckpointTask.checkpointsForGroup(upstreamGroupOffsets, "group1"); + assertEquals(1, checkpoints.size()); + assertEquals(new Checkpoint("group1", sourceT1p0, 250, 201, ""), checkpoints.get(sourceT1p0)); + + // Simulate task restart, during which more offsets are added to the sync topic, and thus the + // corresponding OffsetSyncStore no longer has a mapping for 100->100 + // Now OffsetSyncStore contains entries for: 175->175, 375->375, 475->475 + OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore2 = new OffsetSyncStoreTest.FakeOffsetSyncStore() { + @Override + void backingStoreStart() { + for (int i = 175; i <= 475; i += 100) { + sync(t1p0, i, i); + } + } + }; + offsetSyncStore2.start(false); + + // Simulate loading existing checkpoints into checkpointsPerConsumerGroup (250->201) + Map> checkpointsPerConsumerGroup = new HashMap<>(); + checkpointsPerConsumerGroup.put("group1", checkpoints); + MirrorCheckpointTask mirrorCheckpointTask2 = new MirrorCheckpointTask("source1", "target2", + new DefaultReplicationPolicy(), offsetSyncStore2, Collections.emptySet(), Collections.emptyMap(), + new CheckpointStore(checkpointsPerConsumerGroup)); + + // Upstream offsets 250 and 370 now have the closest downstream value of 176, but this is + // earlier than the downstream value of the last checkpoint (201) - so they are not emitted. + assertEquals(OptionalLong.of(176), offsetSyncStore2.translateDownstream(null, t1p0, 250)); + assertEquals(OptionalLong.of(176), offsetSyncStore2.translateDownstream(null, t1p0, 370)); + upstreamGroupOffsets.put(t1p0, new OffsetAndMetadata(250)); + assertTrue(mirrorCheckpointTask2.checkpointsForGroup(upstreamGroupOffsets, "group1").isEmpty()); + upstreamGroupOffsets.put(t1p0, new OffsetAndMetadata(370)); + assertTrue(mirrorCheckpointTask2.checkpointsForGroup(upstreamGroupOffsets, "group1").isEmpty()); + + // Upstream offset 400 has a closes downstream value of 376, and is emitted because it has + // a later downstream offset than the last checkpoint's downstream (201) + upstreamGroupOffsets.put(t1p0, new OffsetAndMetadata(400)); + Map checkpoints2 = mirrorCheckpointTask2.checkpointsForGroup(upstreamGroupOffsets, "group1"); + assertEquals(1, checkpoints2.size()); + assertEquals(new Checkpoint("group1", sourceT1p0, 400, 376, ""), checkpoints2.get(sourceT1p0)); + } + + @Test + public void testCheckpointStoreInitialized() throws InterruptedException { + CheckpointStore checkpointStore = mock(CheckpointStore.class); + + MirrorCheckpointTask task = new MirrorCheckpointTask("source1", "target2", + new DefaultReplicationPolicy(), + new OffsetSyncStoreTest.FakeOffsetSyncStore(), + Collections.singleton("group"), + Collections.emptyMap(), + checkpointStore) { + + @Override + List sourceRecordsForGroup(String group) { + SourceRecord sr = new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "", 0, null, null); + return Collections.singletonList(sr); + } + }; + + assertNull(task.poll()); + + when(checkpointStore.isInitialized()).thenReturn(true); + List polled = task.poll(); + assertEquals(1, polled.size()); + } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java index bc76a1994db9..3f2ddbc62e93 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java @@ -34,18 +34,22 @@ public class OffsetSyncStoreTest { static TopicPartition tp = new TopicPartition("topic1", 2); static class FakeOffsetSyncStore extends OffsetSyncStore { + private boolean startCalled = false; - FakeOffsetSyncStore() { - super(); + @Override + public void start(boolean initializationMustReadToEnd) { + startCalled = true; + super.start(initializationMustReadToEnd); } @Override - public void start() { - // do not call super to avoid NPE without a KafkaBasedLog. - readToEnd = true; + void backingStoreStart() { + // do not start KafkaBasedLog } + // simulate OffsetSync load as from KafkaBasedLog void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) { + assertTrue(startCalled); // sync in tests should only be called after store.start OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset); byte[] key = offsetSync.recordKey(); byte[] value = offsetSync.recordValue(); @@ -57,7 +61,7 @@ void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOff @Test public void testOffsetTranslation() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { - store.start(); + store.start(true); // Emit synced downstream offset without dead-reckoning store.sync(tp, 100, 200); @@ -82,20 +86,24 @@ public void testOffsetTranslation() { @Test public void testNoTranslationIfStoreNotStarted() { - try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { + try (FakeOffsetSyncStore store = new FakeOffsetSyncStore() { + @Override + void backingStoreStart() { + // read a sync during startup + sync(tp, 100, 200); + assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 0)); + assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 100)); + assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 200)); + } + }) { // no offsets exist and store is not started assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); - // read a sync during startup - store.sync(tp, 100, 200); - assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); - assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); - assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); - // After the store is started all offsets are visible - store.start(); + store.start(true); + assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0)); assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100)); assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200)); @@ -105,26 +113,29 @@ public void testNoTranslationIfStoreNotStarted() { @Test public void testNoTranslationIfNoOffsetSync() { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { - store.start(); + store.start(true); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); } } @Test public void testPastOffsetTranslation() { - try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { - int maxOffsetLag = 10; - int offset = 0; - for (; offset <= 1000; offset += maxOffsetLag) { - store.sync(tp, offset, offset); - assertSparseSyncInvariant(store, tp); + int maxOffsetLag = 10; + try (FakeOffsetSyncStore store = new FakeOffsetSyncStore() { + @Override + void backingStoreStart() { + for (int offset = 0; offset <= 1000; offset += maxOffsetLag) { + sync(tp, offset, offset); + assertSparseSyncInvariant(this, tp); + } } - store.start(); + }) { + store.start(true); // After starting but before seeing new offsets, only the latest startup offset can be translated assertSparseSync(store, 1000, -1); - for (; offset <= 10000; offset += maxOffsetLag) { + for (int offset = 1000 + maxOffsetLag; offset <= 10000; offset += maxOffsetLag) { store.sync(tp, offset, offset); assertSparseSyncInvariant(store, tp); } @@ -155,6 +166,55 @@ public void testPastOffsetTranslation() { } } + // this test has been written knowing the exact offsets syncs stored + @Test + public void testPastOffsetTranslationWithoutInitializationReadToEnd() { + final int maxOffsetLag = 10; + + try (FakeOffsetSyncStore store = new FakeOffsetSyncStore() { + @Override + void backingStoreStart() { + for (int offset = 0; offset <= 1000; offset += maxOffsetLag) { + sync(tp, offset, offset); + assertSparseSyncInvariant(this, tp); + } + } + }) { + + store.start(false); + + // After starting but before seeing new offsets + assertSparseSync(store, 480, 0); + assertSparseSync(store, 720, 480); + assertSparseSync(store, 1000, 990); + + for (int offset = 1000; offset <= 10000; offset += maxOffsetLag) { + store.sync(tp, offset, offset); + assertSparseSyncInvariant(store, tp); + } + + // After seeing new offsets, 1000 was kicked out of the store, so + // offsets before 3840 can only be translated to 1, only previously stored offset is 0 + assertSparseSync(store, 3840, 0); + assertSparseSync(store, 7680, 3840); + assertSparseSync(store, 8640, 7680); + assertSparseSync(store, 9120, 8640); + assertSparseSync(store, 9600, 9120); + assertSparseSync(store, 9840, 9600); + assertSparseSync(store, 9900, 9840); + assertSparseSync(store, 9960, 9900); + assertSparseSync(store, 9990, 9960); + assertSparseSync(store, 10000, 9990); + + // Rewinding upstream offsets should clear all historical syncs + store.sync(tp, 1500, 11000); + assertSparseSyncInvariant(store, tp); + assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499)); + assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500)); + assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000)); + } + } + @Test public void testConsistentlySpacedSyncs() { // Under normal operation, the incoming syncs will be regularly spaced and the store should keep a set of syncs @@ -215,7 +275,7 @@ public void testDroppedSyncsSpacing() { */ private void assertSyncSpacingHasBoundedExpirations(long firstOffset, LongStream steps, int maximumExpirations) { try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { - store.start(); + store.start(true); store.sync(tp, firstOffset, firstOffset); PrimitiveIterator.OfLong iterator = steps.iterator(); long offset = firstOffset; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 36203399766c..c1f19e334910 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -107,6 +107,8 @@ public class KafkaBasedLog { private boolean stopRequested; private final Queue> readLogEndOffsetCallbacks; private final java.util.function.Consumer initializer; + // initialized as false for backward compatibility + private volatile boolean reportErrorsToCallback = false; /** * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until @@ -243,7 +245,12 @@ public void stop() { } public void start() { - log.info("Starting KafkaBasedLog with topic " + topic); + start(false); + } + + public void start(boolean reportErrorsToCallback) { + this.reportErrorsToCallback = reportErrorsToCallback; + log.info("Starting KafkaBasedLog with topic {} reportErrorsToCallback={}", topic, reportErrorsToCallback); // Create the topic admin client and initialize the topic ... admin = topicAdminSupplier.get(); // may be null @@ -468,6 +475,9 @@ private void poll(long timeoutMs) { throw e; } catch (KafkaException e) { log.error("Error polling: " + e); + if (reportErrorsToCallback) { + consumedCallback.onCompletion(e, null); + } } }