From 1d7af47295444c35da19b60ef91683af32aa40b8 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Tue, 14 Jul 2020 12:39:23 +0200 Subject: [PATCH 1/5] DBZ-2288 Events in exported snapshot no longer filtered by LSN --- .../postgresql/PostgresConnectorConfig.java | 10 ++-- .../postgresql/PostgresConnectorTask.java | 9 ++-- .../postgresql/PostgresTaskContext.java | 3 +- .../connection/AbstractMessageDecoder.java | 18 ++++++- .../connection/MessageDecoderConfig.java | 14 +++++- .../PostgresReplicationConnection.java | 14 +++++- .../connection/ReplicationConnection.java | 8 +++ .../pgoutput/PgOutputMessageDecoder.java | 1 + .../pgproto/PgProtoMessageDecoder.java | 5 ++ .../NonStreamingWal2JsonMessageDecoder.java | 5 ++ .../StreamingWal2JsonMessageDecoder.java | 5 ++ .../postgresql/PostgresConnectorIT.java | 50 +++++++++++++++++++ .../postgresql/PostgresConnectorTaskIT.java | 4 +- .../connector/postgresql/TestHelper.java | 12 +++++ .../embedded/AbstractConnectorTest.java | 19 +++++++ 15 files changed, 160 insertions(+), 17 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index e6fbd5e4fc4..438ce1529b3 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -363,7 +363,7 @@ public String getPostgresPluginName() { DECODERBUFS("decoderbufs") { @Override public MessageDecoder messageDecoder(MessageDecoderConfig config) { - return new PgProtoMessageDecoder(); + return new PgProtoMessageDecoder(config); } @Override @@ -374,7 +374,7 @@ public String getPostgresPluginName() { WAL2JSON_STREAMING("wal2json_streaming") { @Override public MessageDecoder messageDecoder(MessageDecoderConfig config) { - return new StreamingWal2JsonMessageDecoder(); + return new StreamingWal2JsonMessageDecoder(config); } @Override @@ -395,7 +395,7 @@ public boolean sendsNullToastedValuesInOld() { WAL2JSON_RDS_STREAMING("wal2json_rds_streaming") { @Override public MessageDecoder messageDecoder(MessageDecoderConfig config) { - return new StreamingWal2JsonMessageDecoder(); + return new StreamingWal2JsonMessageDecoder(config); } @Override @@ -421,7 +421,7 @@ public boolean sendsNullToastedValuesInOld() { WAL2JSON("wal2json") { @Override public MessageDecoder messageDecoder(MessageDecoderConfig config) { - return new NonStreamingWal2JsonMessageDecoder(); + return new NonStreamingWal2JsonMessageDecoder(config); } @Override @@ -442,7 +442,7 @@ public boolean sendsNullToastedValuesInOld() { WAL2JSON_RDS("wal2json_rds") { @Override public MessageDecoder messageDecoder(MessageDecoderConfig config) { - return new NonStreamingWal2JsonMessageDecoder(); + return new NonStreamingWal2JsonMessageDecoder(config); } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 7f9363a749e..6ecd1ada7ff 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -113,9 +113,10 @@ public ChangeEventSourceCoordinator start(Configuration config) { ReplicationConnection replicationConnection = null; SlotCreationResult slotCreatedInfo = null; if (snapshotter.shouldStream()) { - boolean shouldExport = snapshotter.exportSnapshot(); + final boolean shouldExport = snapshotter.exportSnapshot(); + final boolean doSnapshot = snapshotter.shouldSnapshot(); replicationConnection = createReplicationConnection(this.taskContext, shouldExport, - connectorConfig.maxRetries(), connectorConfig.retryDelay()); + doSnapshot, connectorConfig.maxRetries(), connectorConfig.retryDelay()); // we need to create the slot before we start streaming if it doesn't exist // otherwise we can't stream back changes happening while the snapshot is taking place @@ -199,14 +200,14 @@ public ChangeEventSourceCoordinator start(Configuration config) { } public ReplicationConnection createReplicationConnection(PostgresTaskContext taskContext, boolean shouldExport, - int maxRetries, Duration retryDelay) + boolean doSnapshot, int maxRetries, Duration retryDelay) throws ConnectException { final Metronome metronome = Metronome.parker(retryDelay, Clock.SYSTEM); short retryCount = 0; ReplicationConnection replicationConnection = null; while (retryCount <= maxRetries) { try { - return taskContext.createReplicationConnection(shouldExport); + return taskContext.createReplicationConnection(shouldExport, doSnapshot); } catch (SQLException ex) { retryCount++; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java index b9b866dfa48..b97c1d885e8 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java @@ -99,7 +99,7 @@ private SlotState getCurrentSlotState(PostgresConnection connection) throws SQLE return connection.getReplicationSlotState(config.slotName(), config.plugin().getPostgresPluginName()); } - protected ReplicationConnection createReplicationConnection(boolean exportSnapshot) throws SQLException { + protected ReplicationConnection createReplicationConnection(boolean exportSnapshot, boolean doSnapshot) throws SQLException { final boolean dropSlotOnStop = config.dropSlotOnStop(); if (dropSlotOnStop) { LOGGER.warn( @@ -119,6 +119,7 @@ protected ReplicationConnection createReplicationConnection(boolean exportSnapsh .statusUpdateInterval(config.statusUpdateInterval()) .withTypeRegistry(schema.getTypeRegistry()) .exportSnapshotOnCreate(exportSnapshot) + .doSnapshot(doSnapshot) .withSchema(schema) .build(); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractMessageDecoder.java index f17518ea835..9aaea633175 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractMessageDecoder.java @@ -23,6 +23,15 @@ public abstract class AbstractMessageDecoder implements MessageDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessageDecoder.class); + private final boolean filterBasedOnLsn; + + public AbstractMessageDecoder(MessageDecoderConfig config) { + // To provide seamless snapshot to streaming transition in exported mode it is necessary + // to not filter out events based on LSN number as the filtering is done on replication + // slot level + filterBasedOnLsn = !(config.exportedSnapshot() && config.doSnapshot()); + } + @Override public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { // if message is empty pass control right to ReplicationMessageProcessor to update WAL position info @@ -42,8 +51,13 @@ public boolean shouldMessageBeSkipped(ByteBuffer buffer, Long lastReceivedLsn, L // the lsn we started from is inclusive, so we need to avoid sending back the same message twice // but for the first record seen ever it is possible we received the same LSN as the one obtained from replication slot if (startLsn.compareTo(lastReceivedLsn) > 0 || (startLsn.equals(lastReceivedLsn) && skipFirstFlushRecord)) { - LOGGER.info("Streaming requested from LSN {} but received LSN {} that is same or smaller so skipping the message", startLsn, lastReceivedLsn); - return true; + if (filterBasedOnLsn) { + LOGGER.info("Streaming requested from LSN {} but received LSN {} that is same or smaller so skipping the message", startLsn, lastReceivedLsn); + return true; + } + else { + LOGGER.trace("Streaming requested from LSN {} but received LSN {} that is same or smaller so skipping the message", startLsn, lastReceivedLsn); + } } return false; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoderConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoderConfig.java index 236923cce33..1da286cc292 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoderConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoderConfig.java @@ -18,11 +18,15 @@ public class MessageDecoderConfig { private final Configuration configuration; private final PostgresSchema schema; private final String publicationName; + private final boolean exportedSnapshot; + private final boolean doSnapshot; - public MessageDecoderConfig(Configuration configuration, PostgresSchema schema, String publicationName) { + public MessageDecoderConfig(Configuration configuration, PostgresSchema schema, String publicationName, boolean exportedSnapshot, boolean doSnapshot) { this.configuration = configuration; this.schema = schema; this.publicationName = publicationName; + this.exportedSnapshot = exportedSnapshot; + this.doSnapshot = doSnapshot; } public Configuration getConfiguration() { @@ -36,4 +40,12 @@ public PostgresSchema getSchema() { public String getPublicationName() { return publicationName; } + + public boolean exportedSnapshot() { + return exportedSnapshot; + } + + public boolean doSnapshot() { + return doSnapshot; + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 9392a3008e3..13f3665e0df 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -88,6 +88,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep * @param dropSlotOnClose whether the replication slot should be dropped once the connection is closed * @param statusUpdateInterval the interval at which the replication connection should periodically send status * @param exportSnapshot whether the replication should export a snapshot when created + * @param exportSnapshot whether the connector is doing snapshot * @param typeRegistry registry with PostgreSQL types * @param streamParams additional parameters to pass to the replication stream * @param schema the schema; must not be null @@ -102,6 +103,7 @@ private PostgresReplicationConnection(Configuration config, PostgresConnectorConfig.LogicalDecoder plugin, boolean dropSlotOnClose, boolean exportSnapshot, + boolean doSnapshot, Duration statusUpdateInterval, TypeRegistry typeRegistry, Properties streamParams, @@ -117,7 +119,7 @@ private PostgresReplicationConnection(Configuration config, this.dropSlotOnClose = dropSlotOnClose; this.statusUpdateInterval = statusUpdateInterval; this.exportSnapshot = exportSnapshot; - this.messageDecoder = plugin.messageDecoder(new MessageDecoderConfig(config, schema, publicationName)); + this.messageDecoder = plugin.messageDecoder(new MessageDecoderConfig(config, schema, publicationName, exportSnapshot, doSnapshot)); this.typeRegistry = typeRegistry; this.streamParams = streamParams; this.slotCreationInfo = null; @@ -475,6 +477,7 @@ public boolean readPending(ReplicationMessageProcessor processor) throws SQLExce private void deserializeMessages(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { lastReceivedLsn = stream.getLastReceiveLSN(); + LOGGER.trace("Received message at LSN {}", lastReceivedLsn); messageDecoder.processMessage(buffer, processor, typeRegistry); } @@ -616,6 +619,7 @@ protected static class ReplicationConnectionBuilder implements Builder { private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE; private Duration statusUpdateIntervalVal; private boolean exportSnapshot = DEFAULT_EXPORT_SNAPSHOT; + private boolean doSnapshot; private TypeRegistry typeRegistry; private PostgresSchema schema; private Properties slotStreamParams = new Properties(); @@ -696,11 +700,17 @@ public Builder exportSnapshotOnCreate(boolean exportSnapshot) { return this; } + @Override + public Builder doSnapshot(boolean doSnapshot) { + this.doSnapshot = doSnapshot; + return this; + } + @Override public ReplicationConnection build() { assert plugin != null : "Decoding plugin name is not set"; return new PostgresReplicationConnection(config, slotName, publicationName, tableFilter, publicationAutocreateMode, plugin, dropSlotOnClose, exportSnapshot, - statusUpdateIntervalVal, typeRegistry, slotStreamParams, schema); + doSnapshot, statusUpdateIntervalVal, typeRegistry, slotStreamParams, schema); } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java index 8ffcaf209c7..8dcc58b3507 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java @@ -206,6 +206,14 @@ interface Builder { */ Builder exportSnapshotOnCreate(final boolean exportSnapshot); + /** + * Whether or not the snapshot is executed + * @param doSnapshot true if a snapshot should is going to be executed, false if otherwise + * @return this instance + * @see #DEFAULT_EXPORT_SNAPSHOT + */ + Builder doSnapshot(final boolean doSnapshot); + /** * Creates a new {@link ReplicationConnection} instance * @return a connection, never null diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java index 0a6cad78b97..c10f090d4d0 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java @@ -102,6 +102,7 @@ public static MessageType forType(char type) { } public PgOutputMessageDecoder(MessageDecoderConfig config) { + super(config); this.config = config; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java index 574f58cad73..659085bdbe5 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java @@ -19,6 +19,7 @@ import io.debezium.connector.postgresql.TypeRegistry; import io.debezium.connector.postgresql.connection.AbstractMessageDecoder; +import io.debezium.connector.postgresql.connection.MessageDecoderConfig; import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor; import io.debezium.connector.postgresql.proto.PgProto; import io.debezium.connector.postgresql.proto.PgProto.Op; @@ -39,6 +40,10 @@ public class PgProtoMessageDecoder extends AbstractMessageDecoder { private boolean warnedOnUnkownOp = false; + public PgProtoMessageDecoder(MessageDecoderConfig config) { + super(config); + } + @Override public void processNotEmptyMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java index e8ee9645961..4ecf7a0f67d 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java @@ -19,6 +19,7 @@ import io.debezium.connector.postgresql.TypeRegistry; import io.debezium.connector.postgresql.connection.AbstractMessageDecoder; +import io.debezium.connector.postgresql.connection.MessageDecoderConfig; import io.debezium.connector.postgresql.connection.ReplicationMessage.Operation; import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor; import io.debezium.connector.postgresql.connection.TransactionMessage; @@ -44,6 +45,10 @@ public class NonStreamingWal2JsonMessageDecoder extends AbstractMessageDecoder { private final DateTimeFormat dateTime = DateTimeFormat.get(); private boolean containsMetadata = false; + public NonStreamingWal2JsonMessageDecoder(MessageDecoderConfig config) { + super(config); + } + @Override public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { try { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java index e97236f0418..db0a76059d0 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java @@ -18,6 +18,7 @@ import io.debezium.connector.postgresql.TypeRegistry; import io.debezium.connector.postgresql.connection.AbstractMessageDecoder; +import io.debezium.connector.postgresql.connection.MessageDecoderConfig; import io.debezium.connector.postgresql.connection.ReplicationMessage.NoopMessage; import io.debezium.connector.postgresql.connection.ReplicationMessage.Operation; import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor; @@ -108,6 +109,10 @@ public class StreamingWal2JsonMessageDecoder extends AbstractMessageDecoder { private Instant commitTime; + public StreamingWal2JsonMessageDecoder(MessageDecoderConfig config) { + super(config); + } + @Override public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { try { diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 0280ab60190..ed85b45edc0 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.stream.IntStream; @@ -1118,6 +1119,55 @@ public void shouldAllowForExportedSnapshot() throws Exception { VerifyRecord.isValidInsert(s2recs.get(0), PK_FIELD, 3); } + @Test + @FixFor("DBZ-2288") + public void exportedSnapshotShouldNotSkipRecordOfParallelTx() throws Exception { + TestHelper.dropDefaultReplicationSlot(); + TestHelper.createDefaultReplicationSlot(); + + // Testing.Print.enable(); + TestHelper.execute(SETUP_TABLES_STMT); + TestHelper.execute(INSERT_STMT); + + Configuration config = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.EXPORTED.getValue()) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) + .with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 2) + .with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1) + .build(); + final PostgresConnection pgConnection = TestHelper.create(); + pgConnection.setAutoCommit(false); + pgConnection.executeWithoutCommitting(INSERT_STMT); + final AtomicBoolean inserted = new AtomicBoolean(); + start(PostgresConnector.class, config, loggingCompletion(), x -> false, x -> { + if (!inserted.get()) { + TestHelper.execute(INSERT_STMT); + try { + pgConnection.commit(); + } + catch (Exception e) { + e.printStackTrace(); + } + inserted.set(true); + } + }); + assertConnectorIsRunning(); + + // Consume records from the snapshot + SourceRecords actualRecords = consumeRecordsByTopic(4); + + pgConnection.commit(); + + // Consume records from concurrent transactions + actualRecords = consumeRecordsByTopic(4); + + List s1recs = actualRecords.recordsForTopic(topicName("s1.a")); + List s2recs = actualRecords.recordsForTopic(topicName("s1.a")); + s2recs = actualRecords.recordsForTopic(topicName("s2.a")); + assertThat(s1recs.size()).isEqualTo(2); + assertThat(s2recs.size()).isEqualTo(2); + } + @Test @FixFor("DBZ-1437") public void shouldPeformSnapshotOnceForInitialOnlySnapshotMode() throws Exception { diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorTaskIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorTaskIT.java index bb3539eb924..3d6da2ec63f 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorTaskIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorTaskIT.java @@ -35,7 +35,7 @@ public FakeContext(PostgresConnectorConfig postgresConnectorConfig, PostgresSche } @Override - protected ReplicationConnection createReplicationConnection(boolean exportSnapshot) throws SQLException { + protected ReplicationConnection createReplicationConnection(boolean exportSnapshot, boolean doSnapshot) throws SQLException { throw new SQLException("Could not connect"); } } @@ -50,7 +50,7 @@ public void retryOnFailureToCreateConnection() throws Exception { config, null, Charset.forName("UTF-8"), - PostgresTopicSelector.create(config))), true, 3, Duration.ofSeconds(2)); + PostgresTopicSelector.create(config))), true, true, 3, Duration.ofSeconds(2)); // Verify retry happened for 10 seconds long endTime = System.currentTimeMillis(); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java index b4361ad784c..8d2ce9318a7 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java @@ -265,6 +265,18 @@ protected static SourceInfo sourceInfo() { .build())); } + protected static void createDefaultReplicationSlot() { + try { + execute(String.format( + "SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", + ReplicationConnection.Builder.DEFAULT_SLOT_NAME, + decoderPlugin().getPostgresPluginName())); + } + catch (Exception e) { + LOGGER.debug("Error while dropping default replication slot", e); + } + } + protected static void dropDefaultReplicationSlot() { try { execute("SELECT pg_drop_replication_slot('" + ReplicationConnection.Builder.DEFAULT_SLOT_NAME + "')"); diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index a18cf4f9881..0b7c6feeeeb 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -270,6 +270,24 @@ protected void start(Class connectorClass, Configurat */ protected void start(Class connectorClass, Configuration connectorConfig, DebeziumEngine.CompletionCallback callback, Predicate isStopRecord) { + start(connectorClass, connectorConfig, callback, isStopRecord, x -> { + }); + } + + /** + * Start the connector using the supplied connector configuration. + * + * @param connectorClass the connector class; may not be null + * @param connectorConfig the configuration for the connector; may not be null + * @param isStopRecord the function that will be called to determine if the connector should be stopped before processing + * this record; may be null if not needed + * @param callback the function that will be called when the engine fails to start the connector or when the connector + * stops running after completing successfully or due to an error; may be null + * @param recordArrivedListener function invoked when a record arrives and is stored in the queue + */ + protected void start(Class connectorClass, Configuration connectorConfig, + DebeziumEngine.CompletionCallback callback, Predicate isStopRecord, + Consumer recordArrivedListener) { Configuration config = Configuration.copy(connectorConfig) .with(EmbeddedEngine.ENGINE_NAME, "testing-connector") .with(EmbeddedEngine.CONNECTOR_CLASS, connectorClass.getName()) @@ -317,6 +335,7 @@ public void taskStarted() { return; } } + recordArrivedListener.accept(record); }) .using(this.getClass().getClassLoader()) .using(wrapperCallback) From e6379ff5cb42828bd9951531c5ac0c09b1447c8b Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Tue, 14 Jul 2020 14:51:03 +0200 Subject: [PATCH 2/5] DBZ-2288 Test for pgoutput --- .../postgresql/PostgresConnectorIT.java | 57 ++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index ed85b45edc0..0be30dada38 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1121,6 +1121,7 @@ public void shouldAllowForExportedSnapshot() throws Exception { @Test @FixFor("DBZ-2288") + @SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.PGOUTPUT, reason = "PgOutput needs publication for manually created slot") public void exportedSnapshotShouldNotSkipRecordOfParallelTx() throws Exception { TestHelper.dropDefaultReplicationSlot(); TestHelper.createDefaultReplicationSlot(); @@ -1156,7 +1157,57 @@ public void exportedSnapshotShouldNotSkipRecordOfParallelTx() throws Exception { // Consume records from the snapshot SourceRecords actualRecords = consumeRecordsByTopic(4); - pgConnection.commit(); + // Consume records from concurrent transactions + actualRecords = consumeRecordsByTopic(4); + + List s1recs = actualRecords.recordsForTopic(topicName("s1.a")); + List s2recs = actualRecords.recordsForTopic(topicName("s1.a")); + s2recs = actualRecords.recordsForTopic(topicName("s2.a")); + assertThat(s1recs.size()).isEqualTo(2); + assertThat(s2recs.size()).isEqualTo(2); + + stopConnector(); + TestHelper.dropDefaultReplicationSlot(); + } + + @Test + @FixFor("DBZ-2288") + @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication not supported") + public void exportedSnapshotShouldNotSkipRecordOfParallelTxPgoutput() throws Exception { + TestHelper.dropDefaultReplicationSlot(); + TestHelper.createDefaultReplicationSlot(); + TestHelper.execute("CREATE PUBLICATION dbz_publication FOR ALL TABLES;"); + + // Testing.Print.enable(); + TestHelper.execute(SETUP_TABLES_STMT); + TestHelper.execute(INSERT_STMT); + + Configuration config = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.EXPORTED.getValue()) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) + .with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 2) + .with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1) + .build(); + final PostgresConnection pgConnection = TestHelper.create(); + pgConnection.setAutoCommit(false); + pgConnection.executeWithoutCommitting(INSERT_STMT); + final AtomicBoolean inserted = new AtomicBoolean(); + start(PostgresConnector.class, config, loggingCompletion(), x -> false, x -> { + if (!inserted.get()) { + TestHelper.execute(INSERT_STMT); + try { + pgConnection.commit(); + } + catch (Exception e) { + e.printStackTrace(); + } + inserted.set(true); + } + }); + assertConnectorIsRunning(); + + // Consume records from the snapshot + SourceRecords actualRecords = consumeRecordsByTopic(4); // Consume records from concurrent transactions actualRecords = consumeRecordsByTopic(4); @@ -1166,6 +1217,10 @@ public void exportedSnapshotShouldNotSkipRecordOfParallelTx() throws Exception { s2recs = actualRecords.recordsForTopic(topicName("s2.a")); assertThat(s1recs.size()).isEqualTo(2); assertThat(s2recs.size()).isEqualTo(2); + + stopConnector(); + TestHelper.dropPublication(); + TestHelper.dropDefaultReplicationSlot(); } @Test From a2fc4a6558f7771ccb2074f2e78ac7003bfccefb Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 15 Jul 2020 10:44:20 +0200 Subject: [PATCH 3/5] DBZ-2288 Fix copy/paste errors --- .../postgresql/connection/AbstractMessageDecoder.java | 2 +- .../postgresql/connection/PostgresReplicationConnection.java | 2 +- .../connector/postgresql/connection/ReplicationConnection.java | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractMessageDecoder.java index 9aaea633175..6007d5636be 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractMessageDecoder.java @@ -56,7 +56,7 @@ public boolean shouldMessageBeSkipped(ByteBuffer buffer, Long lastReceivedLsn, L return true; } else { - LOGGER.trace("Streaming requested from LSN {} but received LSN {} that is same or smaller so skipping the message", startLsn, lastReceivedLsn); + LOGGER.trace("Streaming requested from LSN {} but received LSN {} that is same or smaller", startLsn, lastReceivedLsn); } } return false; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 13f3665e0df..49b4afcfd99 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -88,7 +88,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep * @param dropSlotOnClose whether the replication slot should be dropped once the connection is closed * @param statusUpdateInterval the interval at which the replication connection should periodically send status * @param exportSnapshot whether the replication should export a snapshot when created - * @param exportSnapshot whether the connector is doing snapshot + * @param doSnapshot whether the connector is doing snapshot * @param typeRegistry registry with PostgreSQL types * @param streamParams additional parameters to pass to the replication stream * @param schema the schema; must not be null diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java index 8dcc58b3507..2ba97454790 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java @@ -208,9 +208,8 @@ interface Builder { /** * Whether or not the snapshot is executed - * @param doSnapshot true if a snapshot should is going to be executed, false if otherwise + * @param doSnapshot true if a snapshot is going to be executed, false if otherwise * @return this instance - * @see #DEFAULT_EXPORT_SNAPSHOT */ Builder doSnapshot(final boolean doSnapshot); From c4e0a1f12b17a101c82e57454bce9da001c587a7 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Thu, 16 Jul 2020 08:51:09 +0200 Subject: [PATCH 4/5] DBZ-2288 Rethrow exception in test --- .../io/debezium/connector/postgresql/PostgresConnectorIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 0be30dada38..58c18caea05 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1147,7 +1147,7 @@ public void exportedSnapshotShouldNotSkipRecordOfParallelTx() throws Exception { pgConnection.commit(); } catch (Exception e) { - e.printStackTrace(); + throw new IllegalStateException(e); } inserted.set(true); } @@ -1199,7 +1199,7 @@ public void exportedSnapshotShouldNotSkipRecordOfParallelTxPgoutput() throws Exc pgConnection.commit(); } catch (Exception e) { - e.printStackTrace(); + throw new IllegalStateException(e); } inserted.set(true); } From 12c94216e3ed3f63a1728f3b865ac4dbda36bd4b Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Thu, 16 Jul 2020 09:00:12 +0200 Subject: [PATCH 5/5] DBZ-2288 Add documentation --- documentation/modules/ROOT/pages/connectors/postgresql.adoc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/documentation/modules/ROOT/pages/connectors/postgresql.adoc b/documentation/modules/ROOT/pages/connectors/postgresql.adoc index 0f511587676..a259416a483 100644 --- a/documentation/modules/ROOT/pages/connectors/postgresql.adoc +++ b/documentation/modules/ROOT/pages/connectors/postgresql.adoc @@ -346,6 +346,12 @@ The fourth snapshot mode, *initial only*, will perform a database snapshot and t The fifth snapshot mode, *exported*, will perform a database snapshot based on the point in time when the replication slot was created. This mode is an excellent way to perform a snapshot in a lock-free way. +[WARNING] +==== +It is strongly recommended to use *exported* mode as the *initial (only)* and *always* modes can lose few events while switching from snapshot to streaming mode when database is under heavy load. +This is a known issue and the affected modes will be reworked to use *exported* mode under the hood (https://issues.redhat.com/browse/DBZ-2337[DBZ-2337]). +==== + ifdef::community[] The final snapshot mode, *custom*, allows the user to inject their own implementation of the `io.debezium.connector.postgresql.spi.Snapshotter` interface via the `snapshot.custom.class` configuration property, with the class on the classpath of your Kafka Connect cluster (or included in the JAR if using the `EmbeddedEngine`). For more details, see the {link-prefix}:{link-postgresql-connector}#postgresql-custom-snapshot[Custom Snapshot] section.