Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-5423 Add test case to support table rename for SQLServer #3738

Merged
merged 4 commits into from Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -8,6 +8,7 @@
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.TYPE_SCALE_PARAMETER_KEY;
import static io.debezium.connector.sqlserver.util.TestHelper.waitForStreamingStarted;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_EXCLUDE_LIST;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_INCLUDE_LIST;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -2618,6 +2619,184 @@ public void shouldIgnoreNullOffsetsWhenRecoveringHistory() {
stopConnector();
}

@Test
@FixFor("DBZ-5423")
public void shouldStreamToOldTableAfterRename() throws Exception {
connection.execute(
"CREATE TABLE account (id int, name varchar(30), amount integer primary key(id))");
TestHelper.enableTableCdc(connection, "account");

final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.build();

start(SqlServerConnector.class, config);
assertConnectorIsRunning();

// Wait for snapshot completion
consumeRecordsByTopic(1);

final Schema expectedSchema = SchemaBuilder.struct()
.optional()
.name("server1.testDB1.dbo.account.Value")
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.field("amount", Schema.OPTIONAL_INT32_SCHEMA)
.build();

connection.setAutoCommit(false);

// Insert a record prior to rename
connection.execute("INSERT INTO account VALUES(10, 'some_name', 120)");

// Assert emitted record state
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic("server1.testDB1.dbo.account");
assertThat(recordsForTopic).hasSize(1);

SourceRecordAssert.assertThat(recordsForTopic.get(0))
.valueAfterFieldIsEqualTo(
new Struct(expectedSchema)
.put("id", 10)
.put("name", "some_name")
.put("amount", 120))
.valueAfterFieldSchemaIsEqualTo(expectedSchema);

// Rename table and test insertion post-insert
// This is to verify that the emitted events still are emitted as "account" despite the table rename
connection.execute("EXEC sp_rename 'account', 'account_new'");
connection.execute("INSERT INTO account_new VALUES (11, 'some_value', 240)");

records = consumeRecordsByTopic(1);
recordsForTopic = records.recordsForTopic("server1.testDB1.dbo.account");
List<SourceRecord> recordsForNewTableTopic = records.recordsForTopic("server1.testDB1.dbo.account_new");

// Assert state
assertThat(recordsForTopic).hasSize(1);
assertThat(recordsForNewTableTopic).isNull();
assertNoRecordsToConsume();

SourceRecordAssert.assertThat(recordsForTopic.get(0))
.valueAfterFieldIsEqualTo(
new Struct(expectedSchema)
.put("id", 11)
.put("name", "some_value")
.put("amount", 240))
.valueAfterFieldSchemaIsEqualTo(expectedSchema);
}

@Test
@FixFor("DBZ-5423")
public void shouldStreamToNewTableAfterRestart() throws Exception {
connection.execute(
"CREATE TABLE account (id int, name varchar(30), amount integer primary key(id))");
TestHelper.enableTableCdc(connection, "account");

final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.build();

start(SqlServerConnector.class, config);
assertConnectorIsRunning();

// Wait for snapshot completion
consumeRecordsByTopic(1);

final Schema expectedSchema = SchemaBuilder.struct()
.optional()
.name("server1.testDB1.dbo.account.Value")
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.field("amount", Schema.OPTIONAL_INT32_SCHEMA)
.build();

connection.setAutoCommit(false);

// Insert a record prior to rename
connection.execute("INSERT INTO account VALUES(10, 'some_name', 120)");

// Assert emitted record state
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic("server1.testDB1.dbo.account");
assertThat(recordsForTopic).hasSize(1);

SourceRecordAssert.assertThat(recordsForTopic.get(0))
.valueAfterFieldIsEqualTo(
new Struct(expectedSchema)
.put("id", 10)
.put("name", "some_name")
.put("amount", 120))
.valueAfterFieldSchemaIsEqualTo(expectedSchema);

// Rename table and test insertion post-insert
// This is to verify that the emitted events still are emitted as "account" despite the table rename
connection.execute("EXEC sp_rename 'account', 'account_new'");
connection.execute("INSERT INTO account_new VALUES (11, 'some_value', 240)");

records = consumeRecordsByTopic(1);
recordsForTopic = records.recordsForTopic("server1.testDB1.dbo.account");
List<SourceRecord> recordsForNewTableTopic = records.recordsForTopic("server1.testDB1.dbo.account_new");

// Assert state
assertThat(recordsForTopic).hasSize(1);
assertThat(recordsForNewTableTopic).isNull();
assertNoRecordsToConsume();

SourceRecordAssert.assertThat(recordsForTopic.get(0))
.valueAfterFieldIsEqualTo(
new Struct(expectedSchema)
.put("id", 11)
.put("name", "some_value")
.put("amount", 240))
.valueAfterFieldSchemaIsEqualTo(expectedSchema);

stopConnector();

final Schema newExpectedSchema = SchemaBuilder.struct()
.optional()
.name("server1.testDB1.dbo.account_new.Value")
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.field("amount", Schema.OPTIONAL_INT32_SCHEMA)
.build();

// Restart the connector
start(SqlServerConnector.class, config);
assertConnectorIsRunning();

waitForStreamingStarted();
assertNoRecordsToConsume();

connection.execute("INSERT INTO account_new VALUES (12, 'some_value2', 241)");

records = consumeRecordsByTopic(1);
recordsForTopic = records.recordsForTopic("server1.testDB1.dbo.account");
recordsForNewTableTopic = records.recordsForTopic("server1.testDB1.dbo.account_new");

final Schema expectedSchemaAfter = SchemaBuilder.struct()
.optional()
.name("server1.testDB1.dbo.account_new.Value")
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.field("amount", Schema.OPTIONAL_INT32_SCHEMA)
.build();

// Assert state
assertThat(recordsForTopic).isNull();
assertThat(recordsForNewTableTopic).hasSize(1);
assertNoRecordsToConsume();

SourceRecordAssert.assertThat(recordsForNewTableTopic.get(0))
.valueAfterFieldIsEqualTo(
new Struct(newExpectedSchema)
.put("id", 12)
.put("name", "some_value2")
.put("amount", 241))
.valueAfterFieldSchemaIsEqualTo(newExpectedSchema);

stopConnector();
}

private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}
Expand Down
6 changes: 6 additions & 0 deletions documentation/modules/ROOT/pages/connectors/sqlserver.adoc
Expand Up @@ -2700,6 +2700,12 @@ That is, columns that were previously defined as optional (or `NULL`) continue t
Similarly, columns that had been defined as required (`NOT NULL`), retain that designation, although they are now defined as `NULL`.
====

[NOTE]
====
After you rename a table using `sp_rename` function, it will continue to emit changes under the old source table name until the connector is restarted.
Upon restart of the connector, it will emit changes under the new source table name.
====

// Type: procedure
// ModuleID: debezium-sql-server-connector-running-an-offline-update-after-a-schema-change
// Title: Running an offline update after a schema change
Expand Down