Skip to content

Commit

Permalink
KAFKA-12819: Add assert messages to MirrorMaker tests plus other qual…
Browse files Browse the repository at this point in the history
…ity of life improvements (#10762)

Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Konstantine Karantasis <k.karantasis@gmail.com>
  • Loading branch information
mdedetrich committed May 27, 2021
1 parent db288e4 commit 56d9482
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ public void testSerde() {
byte[] value = checkpoint.recordValue();
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("any-topic", 7, 8, key, value);
Checkpoint deserialized = Checkpoint.deserializeRecord(record);
assertEquals(checkpoint.consumerGroupId(), deserialized.consumerGroupId());
assertEquals(checkpoint.topicPartition(), deserialized.topicPartition());
assertEquals(checkpoint.upstreamOffset(), deserialized.upstreamOffset());
assertEquals(checkpoint.downstreamOffset(), deserialized.downstreamOffset());
assertEquals(checkpoint.consumerGroupId(), deserialized.consumerGroupId(),
"Failure on checkpoint consumerGroupId serde");
assertEquals(checkpoint.topicPartition(), deserialized.topicPartition(),
"Failure on checkpoint topicPartition serde");
assertEquals(checkpoint.upstreamOffset(), deserialized.upstreamOffset(),
"Failure on checkpoint upstreamOffset serde");
assertEquals(checkpoint.downstreamOffset(), deserialized.downstreamOffset(),
"Failure on checkpoint downstreamOffset serde");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ public void testSerde() {
byte[] value = heartbeat.recordValue();
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("any-topic", 6, 7, key, value);
Heartbeat deserialized = Heartbeat.deserializeRecord(record);
assertEquals(heartbeat.sourceClusterAlias(), deserialized.sourceClusterAlias());
assertEquals(heartbeat.targetClusterAlias(), deserialized.targetClusterAlias());
assertEquals(heartbeat.timestamp(), deserialized.timestamp());
assertEquals(heartbeat.sourceClusterAlias(), deserialized.sourceClusterAlias(),
"Failure on heartbeat sourceClusterAlias serde");
assertEquals(heartbeat.targetClusterAlias(), deserialized.targetClusterAlias(),
"Failure on heartbeat targetClusterAlias serde");
assertEquals(heartbeat.timestamp(), deserialized.timestamp(),
"Failure on heartbeat timestamp serde");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,22 @@

public class MirrorCheckpointConnectorTest {

private static final String CONSUMER_GROUP = "consumer-group-1";

@Test
public void testMirrorCheckpointConnectorDisabled() {
// disable the checkpoint emission
MirrorConnectorConfig config = new MirrorConnectorConfig(
makeProps("emit.checkpoints.enabled", "false"));

List<String> knownConsumerGroups = new ArrayList<>();
knownConsumerGroups.add("consumer-group-1");
knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups,
config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect no task will be created
assertEquals(0, output.size());
assertEquals(0, output.size(), "MirrorCheckpointConnector not disabled");
}

@Test
Expand All @@ -61,14 +63,16 @@ public void testMirrorCheckpointConnectorEnabled() {
makeProps("emit.checkpoints.enabled", "true"));

List<String> knownConsumerGroups = new ArrayList<>();
knownConsumerGroups.add("consumer-group-1");
knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups,
config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect 1 task will be created
assertEquals(1, output.size());
assertEquals("consumer-group-1", output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS));
assertEquals(1, output.size(),
"MirrorCheckpointConnectorEnabled for " + CONSUMER_GROUP + " has incorrect size");
assertEquals(CONSUMER_GROUP, output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS),
"MirrorCheckpointConnectorEnabled for " + CONSUMER_GROUP + " failed");
}

@Test
Expand All @@ -77,7 +81,7 @@ public void testNoConsumerGroup() {
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(new ArrayList<>(), config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect no task will be created
assertEquals(0, output.size());
assertEquals(0, output.size(), "ConsumerGroup shouldn't exist");
}

@Test
Expand All @@ -86,12 +90,12 @@ public void testReplicationDisabled() {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("enabled", "false"));

List<String> knownConsumerGroups = new ArrayList<>();
knownConsumerGroups.add("consumer-group-1");
knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect no task will be created
assertEquals(0, output.size());
assertEquals(0, output.size(), "Replication isn't disabled");
}

@Test
Expand All @@ -100,13 +104,14 @@ public void testReplicationEnabled() {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("enabled", "true"));

List<String> knownConsumerGroups = new ArrayList<>();
knownConsumerGroups.add("consumer-group-1");
knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect 1 task will be created
assertEquals(1, output.size());
assertEquals("consumer-group-1", output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS));
assertEquals(1, output.size(), "Replication for consumer-group-1 has incorrect size");
assertEquals(CONSUMER_GROUP, output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS),
"Replication for consumer-group-1 failed");
}

@Test
Expand All @@ -123,7 +128,8 @@ public void testFindConsumerGroups() throws Exception {
List<String> groupFound = connector.findConsumerGroups();

Set<String> expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
assertEquals(expectedGroups, new HashSet<>(groupFound));
assertEquals(expectedGroups, new HashSet<>(groupFound),
"Expected groups are not the same as findConsumerGroups");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ public void testDownstreamTopicRenaming() {
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap());
assertEquals(new TopicPartition("source1.topic3", 4),
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)));
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)),
"Renaming source1.topic3 failed");
assertEquals(new TopicPartition("topic3", 5),
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5)));
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5)),
"Renaming target2.topic3 failed");
assertEquals(new TopicPartition("source1.source6.topic7", 8),
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)));
mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)),
"Renaming source1.source6.topic7 failed");
}

@Test
Expand All @@ -53,21 +56,33 @@ public void testCheckpoint() {
Checkpoint checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
new OffsetAndMetadata(10, null));
SourceRecord sourceRecord1 = mirrorCheckpointTask.checkpointRecord(checkpoint1, 123L);
assertEquals(new TopicPartition("source1.topic1", 2), checkpoint1.topicPartition());
assertEquals("group9", checkpoint1.consumerGroupId());
assertEquals("group9", Checkpoint.unwrapGroup(sourceRecord1.sourcePartition()));
assertEquals(10, checkpoint1.upstreamOffset());
assertEquals(11, checkpoint1.downstreamOffset());
assertEquals(123L, sourceRecord1.timestamp().longValue());
assertEquals(new TopicPartition("source1.topic1", 2), checkpoint1.topicPartition(),
"checkpoint group9 source1.topic1 failed");
assertEquals("group9", checkpoint1.consumerGroupId(),
"checkpoint group9 consumerGroupId failed");
assertEquals("group9", Checkpoint.unwrapGroup(sourceRecord1.sourcePartition()),
"checkpoint group9 sourcePartition failed");
assertEquals(10, checkpoint1.upstreamOffset(),
"checkpoint group9 upstreamOffset failed");
assertEquals(11, checkpoint1.downstreamOffset(),
"checkpoint group9 downstreamOffset failed");
assertEquals(123L, sourceRecord1.timestamp().longValue(),
"checkpoint group9 timestamp failed");
Checkpoint checkpoint2 = mirrorCheckpointTask.checkpoint("group11", new TopicPartition("target2.topic5", 6),
new OffsetAndMetadata(12, null));
SourceRecord sourceRecord2 = mirrorCheckpointTask.checkpointRecord(checkpoint2, 234L);
assertEquals(new TopicPartition("topic5", 6), checkpoint2.topicPartition());
assertEquals("group11", checkpoint2.consumerGroupId());
assertEquals("group11", Checkpoint.unwrapGroup(sourceRecord2.sourcePartition()));
assertEquals(12, checkpoint2.upstreamOffset());
assertEquals(13, checkpoint2.downstreamOffset());
assertEquals(234L, sourceRecord2.timestamp().longValue());
assertEquals(new TopicPartition("topic5", 6), checkpoint2.topicPartition(),
"checkpoint group11 topic5 failed");
assertEquals("group11", checkpoint2.consumerGroupId(),
"checkpoint group11 consumerGroupId failed");
assertEquals("group11", Checkpoint.unwrapGroup(sourceRecord2.sourcePartition()),
"checkpoint group11 sourcePartition failed");
assertEquals(12, checkpoint2.upstreamOffset(),
"checkpoint group11 upstreamOffset failed");
assertEquals(13, checkpoint2.downstreamOffset(),
"checkpoint group11 downstreamOffset failed");
assertEquals(234L, sourceRecord2.timestamp().longValue(),
"checkpoint group11 timestamp failed");
}

@Test
Expand Down Expand Up @@ -118,7 +133,9 @@ public void testSyncOffset() {

Map<String, Map<TopicPartition, OffsetAndMetadata>> output = mirrorCheckpointTask.syncGroupOffset();

assertEquals(101, output.get(consumer1).get(t1p0).offset());
assertEquals(51, output.get(consumer2).get(t2p0).offset());
assertEquals(101, output.get(consumer1).get(t1p0).offset(),
"Consumer 1 " + topic1 + " failed");
assertEquals(51, output.get(consumer2).get(t2p0).offset(),
"Consumer 2 " + topic2 + " failed");
}
}
Loading

0 comments on commit 56d9482

Please sign in to comment.