Skip to content

Commit

Permalink
DBZ-2793 Add test for SQL server schema filters
Browse files Browse the repository at this point in the history
  • Loading branch information
vjuranek committed Mar 24, 2022
1 parent c30485a commit 61164de
Showing 1 changed file with 94 additions and 0 deletions.
Expand Up @@ -8,6 +8,8 @@
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_SCALE_PARAMETER_KEY;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_EXCLUDE_LIST;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_INCLUDE_LIST;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.MapAssert.entry;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -2569,6 +2571,98 @@ public void shouldIncludeDatabaseNameIntoTopicAndSchemaNamesInMultiPartitionMode
assertThat(record.valueSchema().name()).isEqualTo("server1.testDB.dbo.tablea.Envelope");
}

@Test
@FixFor("DBZ-2793")
public void shouldApplySchemaFilters() throws Exception {
connection.execute("CREATE SCHEMA s1");
connection.execute("CREATE SCHEMA s2");
connection.setAutoCommit(false);
String statements = "DROP TABLE IF EXISTS s1.tablea;" +
"DROP TABLE IF EXISTS s1.tableb;" +
"DROP TABLE IF EXISTS s2.tablea;" +
"DROP TABLE IF EXISTS s2.tableb;" +
"CREATE TABLE s1.tablea (id int PRIMARY KEY, vala integer);" +
"CREATE TABLE s1.tableb (id int PRIMARY KEY, valb integer);" +
"CREATE TABLE s2.tablea (id int PRIMARY KEY, vala integer);" +
"CREATE TABLE s2.tableb (id int PRIMARY KEY, valb integer);";
connection.execute(statements);
connection.setAutoCommit(true);
TestHelper.enableSchemaTableCdc(connection, "s1", "tablea");
TestHelper.enableSchemaTableCdc(connection, "s1", "tableb");
TestHelper.enableSchemaTableCdc(connection, "s2", "tablea");
TestHelper.enableSchemaTableCdc(connection, "s2", "tableb");

// Test exclude filter, s2 schema and default dbo schema should be included.
Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SCHEMA_EXCLUDE_LIST, "s1")
.build();

start(SqlServerConnector.class, config);
assertConnectorIsRunning();

// Wait for snapshot completion
TestHelper.waitForSnapshotToBeCompleted();
consumeRecordsByTopic(1);

connection.execute("INSERT INTO s1.tablea VALUES(1, 1)");
connection.execute("INSERT INTO s1.tableb VALUES(1, 2)");
connection.execute("INSERT INTO s2.tablea VALUES(1, 3)");
connection.execute("INSERT INTO s2.tableb VALUES(1, 4)");
connection.execute("INSERT INTO tablea VALUES(1001, 'a')");
connection.execute("INSERT INTO tableb VALUES(1001, 'b')");

SourceRecords records = consumeRecordsByTopic(4);
List<SourceRecord> tableS1A = records.recordsForTopic("server1.s1.tablea");
List<SourceRecord> tableS1B = records.recordsForTopic("server1.s1.tableb");
List<SourceRecord> tableS2A = records.recordsForTopic("server1.s2.tablea");
List<SourceRecord> tableS2B = records.recordsForTopic("server1.s2.tableb");
List<SourceRecord> tableDboA = records.recordsForTopic("server1.dbo.tablea");
List<SourceRecord> tableDboB = records.recordsForTopic("server1.dbo.tableb");

assertNull(tableS1A);
assertNull(tableS1B);
Assertions.assertThat(tableS2A).hasSize(1);
Assertions.assertThat(tableS2B).hasSize(1);
Assertions.assertThat(tableDboA).hasSize(1);
Assertions.assertThat(tableDboB).hasSize(1);

stopConnector();

// Test include filter, only s1 schema should be included.
config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SCHEMA_INCLUDE_LIST, "s1")
.build();

start(SqlServerConnector.class, config);
assertConnectorIsRunning();

connection.execute("INSERT INTO s1.tablea VALUES(2, 1)");
connection.execute("INSERT INTO s1.tableb VALUES(2, 2)");
connection.execute("INSERT INTO s2.tablea VALUES(2, 3)");
connection.execute("INSERT INTO s2.tableb VALUES(2, 4)");
connection.execute("INSERT INTO tablea VALUES(1002, 'a')");
connection.execute("INSERT INTO tableb VALUES(1002, 'a')");

records = consumeRecordsByTopic(2);
tableS2A = records.recordsForTopic("server1.s2.tablea");
tableS2B = records.recordsForTopic("server1.s2.tableb");
tableDboA = records.recordsForTopic("server1.dbo.tablea");
tableDboB = records.recordsForTopic("server1.dbo.tableb");
tableS1A = records.recordsForTopic("server1.s1.tablea");
tableS1B = records.recordsForTopic("server1.s1.tableb");

Assertions.assertThat(tableS1A).hasSize(1);
Assertions.assertThat(tableS1B).hasSize(1);
assertNull(tableS2A);
assertNull(tableS2B);
assertNull(tableDboA);
assertNull(tableDboB);

stopConnector();
}

private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}
Expand Down

0 comments on commit 61164de

Please sign in to comment.