Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-3860 Provide correct boundaries for multi PK tables #2795

Merged
merged 1 commit into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,27 @@
package io.debezium.connector.postgresql;

import java.sql.SQLException;
import java.util.Map;

import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.util.Testing;

public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<PostgresConnector> {

private static final String TOPIC_NAME = "test_server.s1.a";

private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;" + "CREATE SCHEMA s1; "
+ "CREATE SCHEMA s2; " + "CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));"
+ "CREATE TABLE s1.a4 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer, PRIMARY KEY(pk1, pk2, pk3, pk4));"
+ "CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));";

@Before
Expand Down Expand Up @@ -79,4 +85,70 @@ protected void waitForConnectorToStart() {
super.waitForConnectorToStart();
TestHelper.waitForDefaultReplicationSlotBeActive();
}

@Test
public void inserts4Pks() throws Exception {
Testing.Print.enable();

populate4PkTable();
startConnector();

sendAdHocSnapshotSignal("s1.a4");

Thread.sleep(5000);
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
final int id = i + ROW_COUNT + 1;
final int pk1 = id / 1000;
final int pk2 = (id / 100) % 10;
final int pk3 = (id / 10) % 10;
final int pk4 = id % 10;
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)",
"s1.a4",
pk1,
pk2,
pk3,
pk4,
i + ROW_COUNT));
}
connection.commit();
}

final int expectedRecordCount = ROW_COUNT * 2;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(
expectedRecordCount,
x -> true,
k -> k.getInt32("pk1") * 1_000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"),
"test_server.s1.a4",
null);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
}

protected void populate4PkTable(JdbcConnection connection) throws SQLException {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
final int id = i + 1;
final int pk1 = id / 1000;
final int pk2 = (id / 100) % 10;
final int pk3 = (id / 10) % 10;
final int pk4 = id % 10;
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)",
"s1.a4",
pk1,
pk2,
pk3,
pk4,
i));
}
connection.commit();
}

protected void populate4PkTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,10 @@ protected String buildChunkQuery(Table table) {
if (context.isNonInitialChunk()) {
final StringBuilder sql = new StringBuilder();
// Window boundaries
addKeyColumnsToCondition(table, sql, " >= ?");
sql.append(" AND NOT (");
addKeyColumnsToCondition(table, sql, " = ?");
sql.append(")");
addLowerBound(table, sql);
// Table boundaries
sql.append(" AND ");
addKeyColumnsToCondition(table, sql, " <= ?");
sql.append(" AND NOT ");
addLowerBound(table, sql);
condition = sql.toString();
}
final String orderBy = table.primaryKeyColumns().stream()
Expand All @@ -167,6 +164,41 @@ protected String buildChunkQuery(Table table) {
orderBy);
}

private void addLowerBound(Table table, StringBuilder sql) {
// To make window boundaries working for more than one column it is necessary to calculate
// with independently increasing values in each column independently.
// For one column the condition will be (? will always be the last value seen for the given column)
// (k1 > ?)
// For two columns
// (k1 > ?) OR (k1 = ? AND k2 > ?)
// For four columns
// (k1 > ?) OR (k1 = ? AND k2 > ?) OR (k1 = ? AND k2 = ? AND k3 > ?) OR (k1 = ? AND k2 = ? AND k3 = ? AND k4 > ?)
// etc.
final List<Column> pkColumns = table.primaryKeyColumns();
if (pkColumns.size() > 1) {
sql.append('(');
}
for (int i = 0; i < pkColumns.size(); i++) {
final boolean isLastIterationForI = (i == pkColumns.size() - 1);
sql.append('(');
for (int j = 0; j < i + 1; j++) {
final boolean isLastIterationForJ = (i == j);
sql.append(pkColumns.get(j).name());
sql.append(isLastIterationForJ ? " > ?" : " = ?");
if (!isLastIterationForJ) {
sql.append(" AND ");
}
}
sql.append(")");
if (!isLastIterationForI) {
sql.append(" OR ");
}
}
if (pkColumns.size() > 1) {
sql.append(')');
}
}

protected String buildMaxPrimaryKeyQuery(Table table) {
final String orderBy = table.primaryKeyColumns().stream()
.map(Column::name)
Expand Down Expand Up @@ -346,7 +378,7 @@ private void createDataEventsForTable() {
}
context.nextChunkPosition(lastKey);
if (lastRow != null) {
LOGGER.debug("\t Next window will resume from '{}'", context.chunkEndPosititon());
LOGGER.debug("\t Next window will resume from {}", (Object) context.chunkEndPosititon());
}

LOGGER.debug("\t Finished exporting {} records for window of table table '{}'; total duration '{}'", rows,
Expand Down Expand Up @@ -376,10 +408,18 @@ protected PreparedStatement readTableChunkStatement(String sql) throws SQLExcept
if (context.isNonInitialChunk()) {
final Object[] maximumKey = context.maximumKey().get();
final Object[] chunkEndPosition = context.chunkEndPosititon();
// Fill boundaries placeholders
int pos = 0;
for (int i = 0; i < chunkEndPosition.length; i++) {
statement.setObject(i + 1, chunkEndPosition[i]);
statement.setObject(i + 1 + chunkEndPosition.length, chunkEndPosition[i]);
statement.setObject(i + 1 + 2 * chunkEndPosition.length, maximumKey[i]);
for (int j = 0; j < i + 1; j++) {
statement.setObject(++pos, chunkEndPosition[j]);
}
}
// Fill maximum key placeholders
for (int i = 0; i < chunkEndPosition.length; i++) {
for (int j = 0; j < i + 1; j++) {
statement.setObject(++pos, maximumKey[j]);
}
}
}
return statement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,51 @@ public String getConnectorName() {
}

@Test
public void testBuildQuery() {
public void testBuildQueryOnePkColumn() {
final SignalBasedIncrementalSnapshotChangeEventSource<TableId> source = new SignalBasedIncrementalSnapshotChangeEventSource<>(
config(), new JdbcConnection(config().getConfig(), config -> null, "\"", "\""), null, null, null, SnapshotProgressListener.NO_OP,
DataChangeEventListener.NO_OP);
final IncrementalSnapshotContext<TableId> context = new SignalBasedIncrementalSnapshotContext<>();
source.setContext(context);
final Column pk1 = Column.editor().name("pk1").create();
final Column val1 = Column.editor().name("val1").create();
final Column val2 = Column.editor().name("val2").create();
final Table table = Table.editor().tableId(new TableId(null, "s1", "table1"))
.addColumn(pk1)
.addColumn(val1)
.addColumn(val2)
.setPrimaryKeyNames("pk1").create();
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY pk1 LIMIT 1024");
context.nextChunkPosition(new Object[]{ 1, 5 });
context.maximumKey(new Object[]{ 10, 50 });
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo(
"SELECT * FROM \"s1\".\"table1\" WHERE (pk1 > ?) AND NOT (pk1 > ?) ORDER BY pk1 LIMIT 1024");
}

@Test
public void testBuildQueryThreePkColumns() {
final SignalBasedIncrementalSnapshotChangeEventSource<TableId> source = new SignalBasedIncrementalSnapshotChangeEventSource<>(
config(), new JdbcConnection(config().getConfig(), config -> null, "\"", "\""), null, null, null, SnapshotProgressListener.NO_OP,
DataChangeEventListener.NO_OP);
final IncrementalSnapshotContext<TableId> context = new SignalBasedIncrementalSnapshotContext<>();
source.setContext(context);
final Column pk1 = Column.editor().name("pk1").create();
final Column pk2 = Column.editor().name("pk2").create();
final Column pk3 = Column.editor().name("pk3").create();
final Column val1 = Column.editor().name("val1").create();
final Column val2 = Column.editor().name("val2").create();
final Table table = Table.editor().tableId(new TableId(null, "s1", "table1")).addColumn(pk1).addColumn(pk2)
.addColumn(val1).addColumn(val2).setPrimaryKeyNames("pk1", "pk2").create();
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY pk1, pk2 LIMIT 1024");
final Table table = Table.editor().tableId(new TableId(null, "s1", "table1"))
.addColumn(pk1)
.addColumn(pk2)
.addColumn(pk3)
.addColumn(val1)
.addColumn(val2)
.setPrimaryKeyNames("pk1", "pk2", "pk3").create();
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY pk1, pk2, pk3 LIMIT 1024");
context.nextChunkPosition(new Object[]{ 1, 5 });
context.maximumKey(new Object[]{ 10, 50 });
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo(
"SELECT * FROM \"s1\".\"table1\" WHERE pk1 >= ? AND pk2 >= ? AND NOT (pk1 = ? AND pk2 = ?) AND pk1 <= ? AND pk2 <= ? ORDER BY pk1, pk2 LIMIT 1024");
"SELECT * FROM \"s1\".\"table1\" WHERE ((pk1 > ?) OR (pk1 = ? AND pk2 > ?) OR (pk1 = ? AND pk2 = ? AND pk3 > ?)) AND NOT ((pk1 > ?) OR (pk1 = ? AND pk2 > ?) OR (pk1 = ? AND pk2 = ? AND pk3 > ?)) ORDER BY pk1, pk2, pk3 LIMIT 1024");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,21 @@ protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCo
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount,
Predicate<Map.Entry<Integer, Integer>> dataCompleted, Consumer<List<SourceRecord>> recordConsumer)
throws InterruptedException {
return consumeMixedWithIncrementalSnapshot(recordCount, dataCompleted, k -> k.getInt32(pkFieldName()), topicName(), recordConsumer);
}

protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(
int recordCount,
Predicate<Map.Entry<Integer, Integer>> dataCompleted,
Function<Struct, Integer> idCalculator,
String topicName,
Consumer<List<SourceRecord>> recordConsumer)
throws InterruptedException {
final Map<Integer, Integer> dbChanges = new HashMap<>();
int noRecords = 0;
for (;;) {
final SourceRecords records = consumeRecordsByTopic(1);
final List<SourceRecord> dataRecords = records.recordsForTopic(topicName());
final List<SourceRecord> dataRecords = records.recordsForTopic(topicName);
if (records.allRecordsInOrder().isEmpty()) {
noRecords++;
Assertions.assertThat(noRecords).describedAs("Too many no data record results")
Expand All @@ -94,7 +104,7 @@ protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCo
continue;
}
dataRecords.forEach(record -> {
final int id = ((Struct) record.key()).getInt32(pkFieldName());
final int id = idCalculator.apply((Struct) record.key());
final int value = ((Struct) record.value()).getStruct("after").getInt32(valueFieldName());
dbChanges.put(id, value);
});
Expand Down