Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag #13178

Merged
merged 41 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ba945c9
MINOR: Add MM2 integration test which uses transactional source topics
gharris1727 Jan 31, 2023
7e63785
MINOR: Close leaked consumers in tests that prevent MM2 from syncing …
gharris1727 Jan 31, 2023
0727e53
MINOR: Use consistent consumer group id in MM2 test
gharris1727 Jan 31, 2023
c1ae043
MINOR: invert wait condition in MM2Base::testNoCheckpoints after reco…
gharris1727 Jan 31, 2023
64562e6
MINOR: Strictly assert that all offsets are synced in MM2 tests
gharris1727 Jan 31, 2023
40397d9
MINOR: remove checkpoint assertion from test which does not have upst…
gharris1727 Jan 31, 2023
51d5d4d
KAFKA-12566: refactor waitForAnyCheckpoint to prevent flakey mm2 tests
gharris1727 Jan 30, 2023
fd4ebd8
KAFKA-14663: Implement fair offset sync queueing to prevent unfair st…
gharris1727 Jan 27, 2023
21434c4
KAFKA-12468: Change MM2 offset translation to prevent negative consum…
gharris1727 Jan 27, 2023
9520def
Revert "KAFKA-14663: Implement fair offset sync queueing to prevent u…
gharris1727 Feb 6, 2023
7674973
Merge branch 'trunk' into mm2-negative-offsets
gharris1727 Feb 6, 2023
db1908b
fixup: flush offsets more often to trigger offset syncs
gharris1727 Feb 6, 2023
0a2aa60
fixup: add comment for offset translation logic
gharris1727 Feb 8, 2023
e073e9b
fixup: rename waitForAnyCheckpoint
gharris1727 Feb 8, 2023
beac218
fixup: use try-with-resources to prevent leaked producers
gharris1727 Feb 8, 2023
31e2725
fixup: rename variables
gharris1727 Feb 8, 2023
a95bf9a
fixup: document the produce() override hook
gharris1727 Feb 8, 2023
fce1b4c
fixup: checkstyle
gharris1727 Feb 8, 2023
f315265
fixup: code-golf
gharris1727 Feb 8, 2023
195bbf3
fixup: typo causing test failures
gharris1727 Feb 8, 2023
ba72948
fixup: re-add failing assertion and make it not fail
gharris1727 Feb 8, 2023
cb963ff
KAFKA-13659: Add tests which assert that checkpoints are monotonic
gharris1727 Feb 9, 2023
6dae716
KAFKA-13659: Ensure all offset syncs are read by checkpoint task befo…
gharris1727 Feb 8, 2023
88e3519
KAFKA:13659: fixup: FakeOffsetSyncStore using test constructor
gharris1727 Feb 10, 2023
64f5d8b
KAFKA-13659: fixup: Synchronize OffsetSyncStore as callbacks are now …
gharris1727 Feb 10, 2023
3716b99
KAFKA-13659: fixup: Monotonicity is only preserved within a consumer …
gharris1727 Feb 10, 2023
ecbfd05
KAFKA-13659: Use ConcurrentHashMap instead of synchronization, replac…
gharris1727 Feb 13, 2023
860e596
Merge remote-tracking branch 'upstream/trunk' into mm2-negative-offsets
gharris1727 Feb 13, 2023
336ab34
KAFKA-13659: fixup: replicated topic names in identity policy test
gharris1727 Feb 13, 2023
a0a6762
KAFKA-13659: fixup: Subclass KafkaBasedLog instead of providing a new…
gharris1727 Feb 13, 2023
587152c
KAFKA-13659: fixup: review comments; refactor to remove duplicate fin…
gharris1727 Feb 14, 2023
a730588
KAFKA-13659: fixup: Review comments, fix fake offset store, replace i…
gharris1727 Feb 15, 2023
73199ef
fixup: improve testNoCheckpointsIfNoRecordsAreMirrored
gharris1727 Feb 15, 2023
460a25a
KAFKA-13659: fixup: remove branch from main code that is untestable
gharris1727 Feb 15, 2023
b6b46a2
fixup: Change MirrorSourceTask::commit to drain sync messages before …
gharris1727 Feb 16, 2023
e1320c1
Revert "fixup: Change MirrorSourceTask::commit to drain sync messages…
gharris1727 Feb 16, 2023
aaf3e16
MINOR: Enable periodic offset commits for EOS source tasks when no re…
gharris1727 Feb 16, 2023
218c8e7
Revert "MINOR: Enable periodic offset commits for EOS source tasks wh…
gharris1727 Feb 16, 2023
283d37d
Merge remote-tracking branch 'upstream/trunk' into mm2-negative-offsets
gharris1727 Feb 17, 2023
487b4f6
fixup: change waitForConsumerGroupFullSync to consume records to disc…
gharris1727 Feb 17, 2023
d5c7dda
KAFKA-13659: revert whitespace change to KafkaBasedLog
gharris1727 Feb 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ Map<String, Object> offsetSyncsTopicConsumerConfig() {
: targetConsumerConfig();
}

Map<String, Object> offsetSyncsTopicAdminConfig() {
return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
? sourceAdminConfig()
: targetAdminConfig();
}

Duration consumerPollTimeout() {
return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,13 @@ public void start(Map<String, String> props) {
idleConsumerGroupsOffset = new HashMap<>();
checkpointsPerConsumerGroup = new HashMap<>();
scheduler = new Scheduler(MirrorCheckpointTask.class, config.adminTimeout());
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
"refreshing idle consumers group offsets at target cluster");
scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
"sync idle consumer group offset from source to target");
scheduler.execute(() -> {
offsetSyncStore.start();
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
"refreshing idle consumers group offsets at target cluster");
scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
"sync idle consumer group offset from source to target");
}, "starting offset sync store");
}

@Override
Expand Down Expand Up @@ -136,7 +139,11 @@ public List<SourceRecord> poll() throws InterruptedException {
try {
long deadline = System.currentTimeMillis() + interval.toMillis();
while (!stopping && System.currentTimeMillis() < deadline) {
offsetSyncStore.update(pollTimeout);
Thread.sleep(pollTimeout.toMillis());
}
if (stopping) {
// we are stopping, return early.
return null;
}
List<SourceRecord> records = new ArrayList<>();
for (String group : consumerGroups) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,65 +16,124 @@
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;

import java.util.Map;
import java.util.HashMap;
import java.util.Collections;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;

/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
class OffsetSyncStore implements AutoCloseable {
private final KafkaConsumer<byte[], byte[]> consumer;
private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
private final TopicPartition offsetSyncTopicPartition;
private final KafkaBasedLog<byte[], byte[]> backingStore;
private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
private final TopicAdmin admin;
protected volatile boolean readToEnd = false;

OffsetSyncStore(MirrorCheckpointConfig config) {
consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
new ByteArrayDeserializer(), new ByteArrayDeserializer());
offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
C0urante marked this conversation as resolved.
Show resolved Hide resolved
consumer.assign(Collections.singleton(offsetSyncTopicPartition));
Consumer<byte[], byte[]> consumer = null;
TopicAdmin admin = null;
KafkaBasedLog<byte[], byte[]> store;
try {
consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());
admin = new TopicAdmin(
config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
store = createBackingStore(config, consumer, admin);
} catch (Throwable t) {
Utils.closeQuietly(consumer, "consumer for offset syncs");
Utils.closeQuietly(admin, "admin client for offset syncs");
throw t;
}
this.admin = admin;
this.backingStore = store;
}

// for testing
OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition offsetSyncTopicPartition) {
this.consumer = consumer;
this.offsetSyncTopicPartition = offsetSyncTopicPartition;
private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorCheckpointConfig config, Consumer<byte[], byte[]> consumer, TopicAdmin admin) {
return new KafkaBasedLog<byte[], byte[]>(
config.offsetSyncsTopic(),
Collections.emptyMap(),
Collections.emptyMap(),
() -> admin,
(error, record) -> this.handleRecord(record),
Time.SYSTEM,
ignored -> {
}
) {
@Override
protected Producer<byte[], byte[]> createProducer() {
return null;
}

@Override
protected Consumer<byte[], byte[]> createConsumer() {
return consumer;
}

@Override
protected boolean readPartition(TopicPartition topicPartition) {
return topicPartition.partition() == 0;
}
};
}

OffsetSyncStore() {
this.admin = null;
this.backingStore = null;
}

/**
* Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage.
*/
public void start() {
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
backingStore.start();
readToEnd = true;
}

OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
if (!readToEnd) {
// If we have not read to the end of the syncs topic at least once, decline to translate any offsets.
// This prevents emitting stale offsets while initially reading the offset syncs topic.
return OptionalLong.empty();
}
Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
if (offsetSync.isPresent()) {
if (offsetSync.get().upstreamOffset() > upstreamOffset) {
// Offset is too far in the past to translate accurately
return OptionalLong.of(-1L);
}
long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
// If the consumer group is ahead of the offset sync, we can translate the upstream offset only 1
// downstream offset past the offset sync itself. This is because we know that future records must appear
// ahead of the offset sync, but we cannot estimate how many offsets from the upstream topic
// will be written vs dropped. If we overestimate, then we may skip the correct offset and have data loss.
// This also handles consumer groups at the end of a topic whose offsets point past the last valid record.
// This may cause re-reading of records depending on the age of the offset sync.
// s=offset sync pair, ?=record may or may not be replicated, g=consumer group offset, r=re-read record
// source |-s?????r???g-|
// | ______/
// | /
// vv
// target |-sg----r-----|
long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1;
return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
} else {
return OptionalLong.empty();
}
}

// poll and handle records
synchronized void update(Duration pollTimeout) {
try {
consumer.poll(pollTimeout).forEach(this::handleRecord);
} catch (WakeupException e) {
// swallow
}
}

public synchronized void close() {
consumer.wakeup();
Utils.closeQuietly(consumer, "offset sync store consumer");
@Override
public void close() {
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "backing store for offset syncs");
Utils.closeQuietly(admin, "admin client for offset syncs");
}

protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,16 @@ public void testDownstreamTopicRenaming() {

@Test
public void testCheckpoint() {
long t1UpstreamOffset = 3L;
long t1DownstreamOffset = 4L;
long t2UpstreamOffset = 7L;
long t2DownstreamOffset = 8L;
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
offsetSyncStore.sync(new TopicPartition("topic1", 2), t1UpstreamOffset, t1DownstreamOffset);
offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), t2UpstreamOffset, t2DownstreamOffset);
Optional<Checkpoint> optionalCheckpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
new OffsetAndMetadata(10, null));
assertTrue(optionalCheckpoint1.isPresent());
Expand All @@ -70,7 +75,7 @@ public void testCheckpoint() {
"checkpoint group9 sourcePartition failed");
assertEquals(10, checkpoint1.upstreamOffset(),
"checkpoint group9 upstreamOffset failed");
assertEquals(11, checkpoint1.downstreamOffset(),
assertEquals(t1DownstreamOffset + 1, checkpoint1.downstreamOffset(),
"checkpoint group9 downstreamOffset failed");
assertEquals(123L, sourceRecord1.timestamp().longValue(),
"checkpoint group9 timestamp failed");
Expand All @@ -87,10 +92,27 @@ public void testCheckpoint() {
"checkpoint group11 sourcePartition failed");
assertEquals(12, checkpoint2.upstreamOffset(),
"checkpoint group11 upstreamOffset failed");
assertEquals(13, checkpoint2.downstreamOffset(),
assertEquals(t2DownstreamOffset + 1, checkpoint2.downstreamOffset(),
"checkpoint group11 downstreamOffset failed");
assertEquals(234L, sourceRecord2.timestamp().longValue(),
"checkpoint group11 timestamp failed");
Optional<Checkpoint> optionalCheckpoint3 = mirrorCheckpointTask.checkpoint("group13", new TopicPartition("target2.topic5", 6),
new OffsetAndMetadata(7, null));
assertTrue(optionalCheckpoint3.isPresent());
Checkpoint checkpoint3 = optionalCheckpoint3.get();
SourceRecord sourceRecord3 = mirrorCheckpointTask.checkpointRecord(checkpoint3, 234L);
assertEquals(new TopicPartition("topic5", 6), checkpoint3.topicPartition(),
"checkpoint group13 topic5 failed");
assertEquals("group13", checkpoint3.consumerGroupId(),
"checkpoint group13 consumerGroupId failed");
assertEquals("group13", Checkpoint.unwrapGroup(sourceRecord3.sourcePartition()),
"checkpoint group13 sourcePartition failed");
assertEquals(t2UpstreamOffset, checkpoint3.upstreamOffset(),
"checkpoint group13 upstreamOffset failed");
assertEquals(t2DownstreamOffset, checkpoint3.downstreamOffset(),
"checkpoint group13 downstreamOffset failed");
assertEquals(234L, sourceRecord3.timestamp().longValue(),
"checkpoint group13 timestamp failed");
}

@Test
Expand Down Expand Up @@ -150,6 +172,7 @@ public void testSyncOffset() {
@Test
public void testNoCheckpointForTopicWithoutOffsetSyncs() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 0), 3L, 4L);
Expand All @@ -165,6 +188,7 @@ public void testNoCheckpointForTopicWithoutOffsetSyncs() {
@Test
public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.junit.jupiter.api.Test;

import java.util.OptionalLong;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class OffsetSyncStoreTest {
Expand All @@ -30,7 +32,13 @@ public class OffsetSyncStoreTest {
static class FakeOffsetSyncStore extends OffsetSyncStore {

FakeOffsetSyncStore() {
super(null, null);
super();
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void start() {
// do not call super to avoid NPE without a KafkaBasedLog.
readToEnd = true;
}

void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
Expand All @@ -44,29 +52,57 @@ void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOff

@Test
public void testOffsetTranslation() {
FakeOffsetSyncStore store = new FakeOffsetSyncStore();

store.sync(tp, 100, 200);
assertEquals(250L, store.translateDownstream(tp, 150).getAsLong(),
"Failure in translating downstream offset 250");

// Translate exact offsets
store.sync(tp, 150, 251);
assertEquals(251L, store.translateDownstream(tp, 150).getAsLong(),
"Failure in translating exact downstream offset 251");

// Use old offset (5) prior to any sync -> can't translate
assertEquals(-1, store.translateDownstream(tp, 5).getAsLong(),
"Expected old offset to not translate");

// Downstream offsets reset
store.sync(tp, 200, 10);
assertEquals(10L, store.translateDownstream(tp, 200).getAsLong(),
"Failure in resetting translation of downstream offset");

// Upstream offsets reset
store.sync(tp, 20, 20);
assertEquals(20L, store.translateDownstream(tp, 20).getAsLong(),
"Failure in resetting translation of upstream offset");
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
store.start();

// Emit synced downstream offset without dead-reckoning
store.sync(tp, 100, 200);
assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));

// Translate exact offsets
store.sync(tp, 150, 251);
assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));

// Use old offset (5) prior to any sync -> can't translate
assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));

// Downstream offsets reset
store.sync(tp, 200, 10);
assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));

// Upstream offsets reset
store.sync(tp, 20, 20);
assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
}
}

@Test
public void testNoTranslationIfStoreNotStarted() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
// no offsets exist and store is not started
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));

// read a sync during startup
store.sync(tp, 100, 200);
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));

// After the store is started all offsets are visible
store.start();
assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
}
}

@Test
public void testNoTranslationIfNoOffsetSync() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
store.start();
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
}
}
}