Skip to content

Commit

Permalink
DBZ-5423 Adjust tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Naros committed Nov 10, 2022
1 parent 2dbe90c commit 97bfada
Showing 1 changed file with 77 additions and 7 deletions.
Expand Up @@ -2621,7 +2621,73 @@ public void shouldIgnoreNullOffsetsWhenRecoveringHistory() {

@Test
@FixFor("DBZ-5423")
public void verifyRenameTable() throws Exception {
public void shouldStreamToOldTableAfterRename() throws Exception {
connection.execute(
"CREATE TABLE account (id int, name varchar(30), amount integer primary key(id))");
TestHelper.enableTableCdc(connection, "account");

final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.build();

start(SqlServerConnector.class, config);
assertConnectorIsRunning();

// Wait for snapshot completion
consumeRecordsByTopic(1);

final Schema expectedSchema = SchemaBuilder.struct()
.optional()
.name("server1.testDB1.dbo.account.Value")
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.field("amount", Schema.OPTIONAL_INT32_SCHEMA)
.build();

connection.setAutoCommit(false);

// Insert a record prior to rename
connection.execute("INSERT INTO account VALUES(10, 'some_name', 120)");

// Assert emitted record state
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic("server1.testDB1.dbo.account");
assertThat(recordsForTopic).hasSize(1);

SourceRecordAssert.assertThat(recordsForTopic.get(0))
.valueAfterFieldIsEqualTo(
new Struct(expectedSchema)
.put("id", 10)
.put("name", "some_name")
.put("amount", 120))
.valueAfterFieldSchemaIsEqualTo(expectedSchema);

// Rename table and test insertion post-insert
// This is to verify that the emitted events still are emitted as "account" despite the table rename
connection.execute("EXEC sp_rename 'account', 'account_new'");
connection.execute("INSERT INTO account_new VALUES (11, 'some_value', 240)");

records = consumeRecordsByTopic(1);
recordsForTopic = records.recordsForTopic("server1.testDB1.dbo.account");
List<SourceRecord> recordsForNewTableTopic = records.recordsForTopic("server1.testDB1.dbo.account_new");

// Assert state
assertThat(recordsForTopic).hasSize(1);
assertThat(recordsForNewTableTopic).isNull();
assertNoRecordsToConsume();

SourceRecordAssert.assertThat(recordsForTopic.get(0))
.valueAfterFieldIsEqualTo(
new Struct(expectedSchema)
.put("id", 11)
.put("name", "some_value")
.put("amount", 240))
.valueAfterFieldSchemaIsEqualTo(expectedSchema);
}

@Test
@FixFor("DBZ-5423")
public void shouldStreamToNewTableAfterRestart() throws Exception {
connection.execute(
"CREATE TABLE account (id int, name varchar(30), amount integer primary key(id))");
TestHelper.enableTableCdc(connection, "account");
Expand Down Expand Up @@ -2686,10 +2752,15 @@ public void verifyRenameTable() throws Exception {

stopConnector();

// Restart the connector
// This is done in order to reload the database history and to verify that a restart will
// still emit the right information using account rather than account_new
final Schema newExpectedSchema = SchemaBuilder.struct()
.optional()
.name("server1.testDB1.dbo.account_new.Value")
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.field("amount", Schema.OPTIONAL_INT32_SCHEMA)
.build();

// Restart the connector
start(SqlServerConnector.class, config);
assertConnectorIsRunning();

Expand All @@ -2699,7 +2770,6 @@ public void verifyRenameTable() throws Exception {
connection.execute("INSERT INTO account_new VALUES (12, 'some_value2', 241)");

records = consumeRecordsByTopic(1);
records.allRecordsInOrder().forEach(System.out::println);
recordsForTopic = records.recordsForTopic("server1.testDB1.dbo.account");
recordsForNewTableTopic = records.recordsForTopic("server1.testDB1.dbo.account_new");

Expand All @@ -2718,11 +2788,11 @@ public void verifyRenameTable() throws Exception {

SourceRecordAssert.assertThat(recordsForNewTableTopic.get(0))
.valueAfterFieldIsEqualTo(
new Struct(expectedSchemaAfter)
new Struct(newExpectedSchema)
.put("id", 12)
.put("name", "some_value2")
.put("amount", 241))
.valueAfterFieldSchemaIsEqualTo(expectedSchemaAfter);
.valueAfterFieldSchemaIsEqualTo(newExpectedSchema);

stopConnector();
}
Expand Down

0 comments on commit 97bfada

Please sign in to comment.