Skip to content

Commit

Permalink
Debezium integration tests
Browse files Browse the repository at this point in the history
  - check for flush lsn updates
  • Loading branch information
dlg99 authored and eolivelli committed Jul 13, 2021
1 parent 836ba20 commit 5445344
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
protected final String sourceType;
protected final Map<String, Object> sourceConfig;

public final static Set<String> DEBEZIUM_FIELD_SET = new HashSet<String>() {{
protected int numEntriesToInsert = 1;

public static final Set<String> DEBEZIUM_FIELD_SET = new HashSet<String>() {{
add("before");
add("after");
add("source");
Expand Down Expand Up @@ -85,11 +87,27 @@ public Map<String, Object> sourceConfig() {

public void validateSourceResult(Consumer consumer, int number,
String eventType, String converterClassName) throws Exception {
doPreValidationCheck(eventType);
if (converterClassName.endsWith("AvroConverter")) {
validateSourceResultAvro(consumer, number, eventType);
} else {
validateSourceResultJson(consumer, number, eventType);
}
doPostValidationCheck(eventType);
}

/**
* Execute before regular validation to check database-specific state.
*/
public void doPreValidationCheck(String eventType) {
log.info("pre-validation of {}", eventType);
}

/**
* Execute after regular validation to check database-specific state.
*/
public void doPostValidationCheck(String eventType) {
log.info("post-validation of {}", eventType);
}

public void validateSourceResultJson(Consumer<KeyValue<byte[], byte[]>> consumer, int number, String eventType) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.Assert;

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* A tester for testing Debezium Postgresql source.
Expand All @@ -49,9 +56,19 @@ public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgre

private final PulsarCluster pulsarCluster;

private final AtomicReference<String> confirmedFlushLsn = new AtomicReference<>("not read yet");

public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) {
super(NAME);
this.pulsarCluster = cluster;
/*
todo (possibly solvable by debezium upgrade?): figure out why last message is lost with larger numEntriesToInsert.
I.e. numEntriesToInsert = 100 results in 99 events from debezium 1.0.0, 300 results in 299 events.
10 is handled ok.
Likely this is related to https://issues.redhat.com/browse/DBZ-2288
*/
this.numEntriesToInsert = 10;

pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;

sourceConfig.put("database.hostname", DebeziumPostgreSqlContainer.NAME);
Expand Down Expand Up @@ -81,31 +98,56 @@ public void prepareSource() {

@Override
public void prepareInsertEvent() throws Exception {
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select * from inventory.products;\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres " +
"-c \"insert into inventory.products(name, description, weight) " +
"values('test-debezium', 'description', 10);\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres "+
"-c \"select count(1), max(id) from inventory.products where name='test-debezium' and weight=10;\"");
}

@Override
public void prepareDeleteEvent() throws Exception {
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select * from inventory.products;\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres " +
"-c \"delete from inventory.products where name='test-debezium';\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select count(1) from inventory.products where name='test-debezium';\"");
}

@Override
public void prepareUpdateEvent() throws Exception {
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select * from inventory.products;\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres " +
"-c \"update inventory.products " +
"set description='test-update-description', weight='20' where name='test-debezium';\"");
this.debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c " +
"\"select count(1) from inventory.products where name='test-debezium' and weight=20;\"");
}

@Override
public void doPostValidationCheck(String eventType) {
super.doPostValidationCheck(eventType);
/*
confirmed_flush_lsn in pg_replication_slots table has to change,
otherwise postgres won't truncate WAL and the disk space will grow.
I.e. upgrade from debezium 1.0.0 to 1.0.3 resulted in confirmed_flush_lsn
not updating in insert-heavy load.
*/
try {
ContainerExecResult res = debeziumPostgresqlContainer.execCmd("/bin/bash", "-c",
"psql -h 127.0.0.1 -U postgres -d postgres -c \"select confirmed_flush_lsn from pg_replication_slots;\"");
res.assertNoStderr();
String lastConfirmedFlushLsn = res.getStdout();
log.info("Current confirmedFlushLsn: \n{} \nLast confirmedFlushLsn: \n{}",
confirmedFlushLsn.get(), lastConfirmedFlushLsn);
org.junit.Assert.assertNotEquals(confirmedFlushLsn.get(), lastConfirmedFlushLsn);
confirmedFlushLsn.set(lastConfirmedFlushLsn);
} catch (Exception e) {
Assert.fail("failed to get flush lsn", e);
}
}

@Override
Expand All @@ -117,7 +159,7 @@ public Map<String, String> produceSourceMessages(int numMessages) {
@Override
public void close() {
if (pulsarCluster != null) {
pulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, debeziumPostgresqlContainer);
PulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, debeziumPostgresqlContainer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@Slf4j
public class PulsarDebeziumSourcesTest extends PulsarIOTestBase {

protected final AtomicInteger testId = new AtomicInteger(0);
protected final AtomicInteger testId = new AtomicInteger(0);

@Test(groups = "source")
public void testDebeziumMySqlSourceJson() throws Exception {
Expand Down Expand Up @@ -104,21 +104,20 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit
sourceTester.setServiceContainer(mySQLContainer);

PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);

runner.testSource(sourceTester);
}

private void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
private void testDebeziumPostgreSqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {

final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
final String outputTopicName = "debe-output-topic-name-" + testId.getAndIncrement();
final String consumeTopicName = "debezium/postgresql/dbserver1.inventory.products";
final String sourceName = "test-source-debezium-postgersql-" + functionRuntimeType + "-" + randomName(8);


// This is the binlog count that contained in postgresql container.
final int numMessages = 26;

Expand All @@ -143,13 +142,13 @@ private void testDebeziumPostgreSqlConnect(String converterClassName, boolean js
sourceTester.setServiceContainer(postgreSqlContainer);

PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);

runner.testSource(sourceTester);
}

private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {

final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
Expand Down Expand Up @@ -182,8 +181,8 @@ private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonW
sourceTester.setServiceContainer(mongoDbContainer);

PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);

runner.testSource(sourceTester);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.tests.integration.io.sources.debezium;

import com.google.common.base.Preconditions;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
Expand All @@ -34,35 +35,35 @@
@Slf4j
public class PulsarIODebeziumSourceRunner extends PulsarIOSourceRunner {

private String converterClassName;
private String tenant;
private String namespace;
private String sourceName;
private String outputTopicName;
private String consumeTopicName;
private int numMessages;
private boolean jsonWithEnvelope;
private PulsarClient client;
public PulsarIODebeziumSourceRunner(PulsarCluster cluster, String functionRuntimeType, String converterClassName,
String tenant, String ns, String sourceName, String outputTopic, int numMessages, boolean jsonWithEnvelope,
String consumeTopicName, PulsarClient client) {
super(cluster, functionRuntimeType);
this.converterClassName = converterClassName;
this.tenant = tenant;
this.namespace = ns;
this.sourceName = sourceName;
this.outputTopicName = outputTopic;
this.numMessages = numMessages;
this.jsonWithEnvelope = jsonWithEnvelope;
this.consumeTopicName = consumeTopicName;
this.client = client;
}

@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T extends GenericContainer> void testSource(SourceTester<T> sourceTester) throws Exception {
// prepare the testing environment for source
private String converterClassName;
private String tenant;
private String namespace;
private String sourceName;
private String outputTopicName;
private String consumeTopicName;
private int numMessages;
private boolean jsonWithEnvelope;
private PulsarClient client;
public PulsarIODebeziumSourceRunner(PulsarCluster cluster, String functionRuntimeType, String converterClassName,
String tenant, String ns, String sourceName, String outputTopic, int numMessages, boolean jsonWithEnvelope,
String consumeTopicName, PulsarClient client) {
super(cluster, functionRuntimeType);
this.converterClassName = converterClassName;
this.tenant = tenant;
this.namespace = ns;
this.sourceName = sourceName;
this.outputTopicName = outputTopic;
this.numMessages = numMessages;
this.jsonWithEnvelope = jsonWithEnvelope;
this.consumeTopicName = consumeTopicName;
this.client = client;
}

@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T extends GenericContainer> void testSource(SourceTester<T> sourceTester) throws Exception {
// prepare the testing environment for source
prepareSource(sourceTester);

// submit the source connector
Expand Down Expand Up @@ -90,28 +91,33 @@ public <T extends GenericContainer> void testSource(SourceTester<T> sourceTester
// validate the source result
sourceTester.validateSourceResult(consumer, 9, null, converterClassName);

// prepare insert event
sourceTester.prepareInsertEvent();
final int numEntriesToInsert = sourceTester.getNumEntriesToInsert();
Preconditions.checkArgument(numEntriesToInsert >= 1);

// validate the source insert event
sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName);
for (int i = 1; i <= numEntriesToInsert; i++) {
// prepare insert event
sourceTester.prepareInsertEvent();
log.info("inserted entry {} of {}", i, numEntriesToInsert);
// validate the source insert event
sourceTester.validateSourceResult(consumer, 1, SourceTester.INSERT, converterClassName);
}

// prepare update event
sourceTester.prepareUpdateEvent();

// validate the source update event
sourceTester.validateSourceResult(consumer, 1, SourceTester.UPDATE, converterClassName);
sourceTester.validateSourceResult(consumer, numEntriesToInsert, SourceTester.UPDATE, converterClassName);

// prepare delete event
sourceTester.prepareDeleteEvent();

// validate the source delete event
sourceTester.validateSourceResult(consumer, 1, SourceTester.DELETE, converterClassName);
sourceTester.validateSourceResult(consumer, numEntriesToInsert, SourceTester.DELETE, converterClassName);

// delete the source
deleteSource(tenant, namespace, sourceName);

// get source info (source should be deleted)
getSourceInfoNotFound(tenant, namespace, sourceName);
}
}
}

0 comments on commit 5445344

Please sign in to comment.