Skip to content

Commit

Permalink
DBZ-5423 Update test case
Browse files Browse the repository at this point in the history
  • Loading branch information
Naros committed Jul 26, 2022
1 parent d8d1f3c commit 9148d1c
Showing 1 changed file with 73 additions and 23 deletions.
Expand Up @@ -8,6 +8,7 @@
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_SCALE_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.waitForStreamingStarted;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_EXCLUDE_LIST;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_INCLUDE_LIST;
import static org.fest.assertions.Assertions.assertThat;
Expand Down Expand Up @@ -2627,37 +2628,86 @@ public void verifyRenameTable() throws Exception {
// Wait for snapshot completion
consumeRecordsByTopic(1);

final Schema expectedSchema = SchemaBuilder.struct()
.optional()
.name("server1.testDB1.dbo.account.Value")
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.field("amount", Schema.OPTIONAL_INT32_SCHEMA)
.build();

connection.setAutoCommit(false);

// Insert a record prior to rename
connection.execute("INSERT INTO account VALUES(10, 'some_name', 120)");

// Assert emitted record state
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic("server1.testDB1.dbo.account");
assertThat(recordsForTopic).hasSize(1);

SourceRecordAssert.assertThat(recordsForTopic.get(0))
.valueAfterFieldIsEqualTo(
new Struct(expectedSchema)
.put("id", 10)
.put("name", "some_name")
.put("amount", 120))
.valueAfterFieldSchemaIsEqualTo(expectedSchema);

// Rename table and test insertion post-insert
// This is to verify that the emitted events still are emitted as "account" despite the table rename
connection.execute("EXEC sp_rename 'account', 'account_new'");
connection.execute("INSERT INTO account_new VALUES (11, 'some_value', 240)");

final SourceRecords records = consumeRecordsByTopic(2);
final List<SourceRecord> tableA = records.recordsForTopic("server1.testDB1.dbo.account_new");
records = consumeRecordsByTopic(1);
recordsForTopic = records.recordsForTopic("server1.testDB1.dbo.account");
List<SourceRecord> recordsForNewTableTopic = records.recordsForTopic("server1.testDB1.dbo.account_new");

Assertions.assertThat(tableA).isNotEmpty();
Schema expectedSchemaA = SchemaBuilder.struct()
.optional()
.name("server1.testDB1.dbo.account_new.Value")
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.field("amount", Schema.OPTIONAL_INT32_SCHEMA)
.build();
Struct expectedValueA = new Struct(expectedSchemaA)
.put("id", 10)
.put("name", "some_name")
.put("amount", 120);
// Assert state
assertThat(recordsForTopic).hasSize(1);
assertThat(recordsForNewTableTopic).isNull();
assertNoRecordsToConsume();

SourceRecordAssert.assertThat(tableA.get(0))
.valueAfterFieldIsEqualTo(expectedValueA)
.valueAfterFieldSchemaIsEqualTo(expectedSchemaA);
SourceRecordAssert.assertThat(recordsForTopic.get(0))
.valueAfterFieldIsEqualTo(
new Struct(expectedSchema)
.put("id", 11)
.put("name", "some_value")
.put("amount", 240))
.valueAfterFieldSchemaIsEqualTo(expectedSchema);

stopConnector();

// Restart the connector
// This is done in order to reload the database history and to verify that a restart will
// still emit the right information using account rather than account_new

start(SqlServerConnector.class, config);
assertConnectorIsRunning();

waitForStreamingStarted();
assertNoRecordsToConsume();

connection.execute("INSERT INTO account_new VALUES (12, 'some_value2', 241)");

records = consumeRecordsByTopic(1);
records.allRecordsInOrder().forEach(System.out::println);
recordsForTopic = records.recordsForTopic("server1.testDB1.dbo.account");
recordsForNewTableTopic = records.recordsForTopic("server1.testDB1.dbo.account_new");

// Assert state
assertThat(recordsForTopic).hasSize(1);
assertThat(recordsForNewTableTopic).isNull();
assertNoRecordsToConsume();

SourceRecordAssert.assertThat(recordsForTopic.get(0))
.valueAfterFieldIsEqualTo(
new Struct(expectedSchema)
.put("id", 12)
.put("name", "some_value2")
.put("amount", 241))
.valueAfterFieldSchemaIsEqualTo(expectedSchema);

Struct value = (Struct) tableA.get(0).value();
if (value.getStruct("after") != null) {
assertThat(value.getStruct("after").get("id")).isEqualTo(10);
assertThat(value.getStruct("after").get("name")).isEqualTo("some_name");
assertThat(value.getStruct("after").get("amount")).isEqualTo(120);
}
stopConnector();
}

Expand Down

0 comments on commit 9148d1c

Please sign in to comment.