Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-2288 Events in exported snapshot no longer filtered by LSN #1697

Merged
merged 5 commits into from Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -363,7 +363,7 @@ public String getPostgresPluginName() {
DECODERBUFS("decoderbufs") {
@Override
public MessageDecoder messageDecoder(MessageDecoderConfig config) {
return new PgProtoMessageDecoder();
return new PgProtoMessageDecoder(config);
}

@Override
Expand All @@ -374,7 +374,7 @@ public String getPostgresPluginName() {
WAL2JSON_STREAMING("wal2json_streaming") {
@Override
public MessageDecoder messageDecoder(MessageDecoderConfig config) {
return new StreamingWal2JsonMessageDecoder();
return new StreamingWal2JsonMessageDecoder(config);
}

@Override
Expand All @@ -395,7 +395,7 @@ public boolean sendsNullToastedValuesInOld() {
WAL2JSON_RDS_STREAMING("wal2json_rds_streaming") {
@Override
public MessageDecoder messageDecoder(MessageDecoderConfig config) {
return new StreamingWal2JsonMessageDecoder();
return new StreamingWal2JsonMessageDecoder(config);
}

@Override
Expand All @@ -421,7 +421,7 @@ public boolean sendsNullToastedValuesInOld() {
WAL2JSON("wal2json") {
@Override
public MessageDecoder messageDecoder(MessageDecoderConfig config) {
return new NonStreamingWal2JsonMessageDecoder();
return new NonStreamingWal2JsonMessageDecoder(config);
}

@Override
Expand All @@ -442,7 +442,7 @@ public boolean sendsNullToastedValuesInOld() {
WAL2JSON_RDS("wal2json_rds") {
@Override
public MessageDecoder messageDecoder(MessageDecoderConfig config) {
return new NonStreamingWal2JsonMessageDecoder();
return new NonStreamingWal2JsonMessageDecoder(config);
}

@Override
Expand Down
Expand Up @@ -113,9 +113,10 @@ public ChangeEventSourceCoordinator start(Configuration config) {
ReplicationConnection replicationConnection = null;
SlotCreationResult slotCreatedInfo = null;
if (snapshotter.shouldStream()) {
boolean shouldExport = snapshotter.exportSnapshot();
final boolean shouldExport = snapshotter.exportSnapshot();
final boolean doSnapshot = snapshotter.shouldSnapshot();
replicationConnection = createReplicationConnection(this.taskContext, shouldExport,
connectorConfig.maxRetries(), connectorConfig.retryDelay());
doSnapshot, connectorConfig.maxRetries(), connectorConfig.retryDelay());

// we need to create the slot before we start streaming if it doesn't exist
// otherwise we can't stream back changes happening while the snapshot is taking place
Expand Down Expand Up @@ -199,14 +200,14 @@ public ChangeEventSourceCoordinator start(Configuration config) {
}

public ReplicationConnection createReplicationConnection(PostgresTaskContext taskContext, boolean shouldExport,
int maxRetries, Duration retryDelay)
boolean doSnapshot, int maxRetries, Duration retryDelay)
throws ConnectException {
final Metronome metronome = Metronome.parker(retryDelay, Clock.SYSTEM);
short retryCount = 0;
ReplicationConnection replicationConnection = null;
while (retryCount <= maxRetries) {
try {
return taskContext.createReplicationConnection(shouldExport);
return taskContext.createReplicationConnection(shouldExport, doSnapshot);
}
catch (SQLException ex) {
retryCount++;
Expand Down
Expand Up @@ -99,7 +99,7 @@ private SlotState getCurrentSlotState(PostgresConnection connection) throws SQLE
return connection.getReplicationSlotState(config.slotName(), config.plugin().getPostgresPluginName());
}

protected ReplicationConnection createReplicationConnection(boolean exportSnapshot) throws SQLException {
protected ReplicationConnection createReplicationConnection(boolean exportSnapshot, boolean doSnapshot) throws SQLException {
final boolean dropSlotOnStop = config.dropSlotOnStop();
if (dropSlotOnStop) {
LOGGER.warn(
Expand All @@ -119,6 +119,7 @@ protected ReplicationConnection createReplicationConnection(boolean exportSnapsh
.statusUpdateInterval(config.statusUpdateInterval())
.withTypeRegistry(schema.getTypeRegistry())
.exportSnapshotOnCreate(exportSnapshot)
.doSnapshot(doSnapshot)
.withSchema(schema)
.build();
}
Expand Down
Expand Up @@ -23,6 +23,15 @@ public abstract class AbstractMessageDecoder implements MessageDecoder {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessageDecoder.class);

private final boolean filterBasedOnLsn;

public AbstractMessageDecoder(MessageDecoderConfig config) {
// To provide seamless snapshot to streaming transition in exported mode it is necessary
// to not filter out events based on LSN number as the filtering is done on replication
// slot level
filterBasedOnLsn = !(config.exportedSnapshot() && config.doSnapshot());
}

@Override
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
// if message is empty pass control right to ReplicationMessageProcessor to update WAL position info
Expand All @@ -42,8 +51,13 @@ public boolean shouldMessageBeSkipped(ByteBuffer buffer, Long lastReceivedLsn, L
// the lsn we started from is inclusive, so we need to avoid sending back the same message twice
// but for the first record seen ever it is possible we received the same LSN as the one obtained from replication slot
if (startLsn.compareTo(lastReceivedLsn) > 0 || (startLsn.equals(lastReceivedLsn) && skipFirstFlushRecord)) {
LOGGER.info("Streaming requested from LSN {} but received LSN {} that is same or smaller so skipping the message", startLsn, lastReceivedLsn);
return true;
if (filterBasedOnLsn) {
LOGGER.info("Streaming requested from LSN {} but received LSN {} that is same or smaller so skipping the message", startLsn, lastReceivedLsn);
return true;
}
else {
LOGGER.trace("Streaming requested from LSN {} but received LSN {} that is same or smaller so skipping the message", startLsn, lastReceivedLsn);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But are you skipping it in this case?

}
}
return false;
}
Expand Down
Expand Up @@ -18,11 +18,15 @@ public class MessageDecoderConfig {
private final Configuration configuration;
private final PostgresSchema schema;
private final String publicationName;
private final boolean exportedSnapshot;
private final boolean doSnapshot;

public MessageDecoderConfig(Configuration configuration, PostgresSchema schema, String publicationName) {
public MessageDecoderConfig(Configuration configuration, PostgresSchema schema, String publicationName, boolean exportedSnapshot, boolean doSnapshot) {
this.configuration = configuration;
this.schema = schema;
this.publicationName = publicationName;
this.exportedSnapshot = exportedSnapshot;
this.doSnapshot = doSnapshot;
}

public Configuration getConfiguration() {
Expand All @@ -36,4 +40,12 @@ public PostgresSchema getSchema() {
public String getPublicationName() {
return publicationName;
}

public boolean exportedSnapshot() {
return exportedSnapshot;
}

public boolean doSnapshot() {
return doSnapshot;
}
}
Expand Up @@ -88,6 +88,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
* @param dropSlotOnClose whether the replication slot should be dropped once the connection is closed
* @param statusUpdateInterval the interval at which the replication connection should periodically send status
* @param exportSnapshot whether the replication should export a snapshot when created
* @param exportSnapshot whether the connector is doing snapshot
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param exportSnapshot whether the connector is doing snapshot
* @param doSnapshot whether the connector is doing snapshot

* @param typeRegistry registry with PostgreSQL types
* @param streamParams additional parameters to pass to the replication stream
* @param schema the schema; must not be null
Expand All @@ -102,6 +103,7 @@ private PostgresReplicationConnection(Configuration config,
PostgresConnectorConfig.LogicalDecoder plugin,
boolean dropSlotOnClose,
boolean exportSnapshot,
boolean doSnapshot,
Duration statusUpdateInterval,
TypeRegistry typeRegistry,
Properties streamParams,
Expand All @@ -117,7 +119,7 @@ private PostgresReplicationConnection(Configuration config,
this.dropSlotOnClose = dropSlotOnClose;
this.statusUpdateInterval = statusUpdateInterval;
this.exportSnapshot = exportSnapshot;
this.messageDecoder = plugin.messageDecoder(new MessageDecoderConfig(config, schema, publicationName));
this.messageDecoder = plugin.messageDecoder(new MessageDecoderConfig(config, schema, publicationName, exportSnapshot, doSnapshot));
this.typeRegistry = typeRegistry;
this.streamParams = streamParams;
this.slotCreationInfo = null;
Expand Down Expand Up @@ -475,6 +477,7 @@ public boolean readPending(ReplicationMessageProcessor processor) throws SQLExce

private void deserializeMessages(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
lastReceivedLsn = stream.getLastReceiveLSN();
LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
messageDecoder.processMessage(buffer, processor, typeRegistry);
}

Expand Down Expand Up @@ -616,6 +619,7 @@ protected static class ReplicationConnectionBuilder implements Builder {
private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
private Duration statusUpdateIntervalVal;
private boolean exportSnapshot = DEFAULT_EXPORT_SNAPSHOT;
private boolean doSnapshot;
private TypeRegistry typeRegistry;
private PostgresSchema schema;
private Properties slotStreamParams = new Properties();
Expand Down Expand Up @@ -696,11 +700,17 @@ public Builder exportSnapshotOnCreate(boolean exportSnapshot) {
return this;
}

@Override
public Builder doSnapshot(boolean doSnapshot) {
this.doSnapshot = doSnapshot;
return this;
}

@Override
public ReplicationConnection build() {
assert plugin != null : "Decoding plugin name is not set";
return new PostgresReplicationConnection(config, slotName, publicationName, tableFilter, publicationAutocreateMode, plugin, dropSlotOnClose, exportSnapshot,
statusUpdateIntervalVal, typeRegistry, slotStreamParams, schema);
doSnapshot, statusUpdateIntervalVal, typeRegistry, slotStreamParams, schema);
}

@Override
Expand Down
Expand Up @@ -206,6 +206,14 @@ interface Builder {
*/
Builder exportSnapshotOnCreate(final boolean exportSnapshot);

/**
* Whether or not the snapshot is executed
* @param doSnapshot true if a snapshot should is going to be executed, false if otherwise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param doSnapshot true if a snapshot should is going to be executed, false if otherwise
* @param doSnapshot true if a snapshot is going to be executed, false if otherwise

* @return this instance
* @see #DEFAULT_EXPORT_SNAPSHOT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't apply here?

*/
Builder doSnapshot(final boolean doSnapshot);

/**
* Creates a new {@link ReplicationConnection} instance
* @return a connection, never null
Expand Down
Expand Up @@ -102,6 +102,7 @@ public static MessageType forType(char type) {
}

public PgOutputMessageDecoder(MessageDecoderConfig config) {
super(config);
this.config = config;
}

Expand Down
Expand Up @@ -19,6 +19,7 @@

import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.AbstractMessageDecoder;
import io.debezium.connector.postgresql.connection.MessageDecoderConfig;
import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor;
import io.debezium.connector.postgresql.proto.PgProto;
import io.debezium.connector.postgresql.proto.PgProto.Op;
Expand All @@ -39,6 +40,10 @@ public class PgProtoMessageDecoder extends AbstractMessageDecoder {

private boolean warnedOnUnkownOp = false;

public PgProtoMessageDecoder(MessageDecoderConfig config) {
super(config);
}

@Override
public void processNotEmptyMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry)
throws SQLException, InterruptedException {
Expand Down
Expand Up @@ -19,6 +19,7 @@

import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.AbstractMessageDecoder;
import io.debezium.connector.postgresql.connection.MessageDecoderConfig;
import io.debezium.connector.postgresql.connection.ReplicationMessage.Operation;
import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor;
import io.debezium.connector.postgresql.connection.TransactionMessage;
Expand All @@ -44,6 +45,10 @@ public class NonStreamingWal2JsonMessageDecoder extends AbstractMessageDecoder {
private final DateTimeFormat dateTime = DateTimeFormat.get();
private boolean containsMetadata = false;

public NonStreamingWal2JsonMessageDecoder(MessageDecoderConfig config) {
super(config);
}

@Override
public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
try {
Expand Down
Expand Up @@ -18,6 +18,7 @@

import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.AbstractMessageDecoder;
import io.debezium.connector.postgresql.connection.MessageDecoderConfig;
import io.debezium.connector.postgresql.connection.ReplicationMessage.NoopMessage;
import io.debezium.connector.postgresql.connection.ReplicationMessage.Operation;
import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor;
Expand Down Expand Up @@ -108,6 +109,10 @@ public class StreamingWal2JsonMessageDecoder extends AbstractMessageDecoder {

private Instant commitTime;

public StreamingWal2JsonMessageDecoder(MessageDecoderConfig config) {
super(config);
}

@Override
public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
try {
Expand Down