Skip to content

Commit

Permalink
source-mssql: shorten capture job polling interval in tests (#33510)
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar committed Jan 8, 2024
1 parent f8f6438 commit c284588
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 15 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 3.5.0
dockerImageTag: 3.5.1
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.debezium.connector.sqlserver.Lsn;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -23,6 +24,9 @@
public class MssqlCdcTargetPosition implements CdcTargetPosition<Lsn> {

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcTargetPosition.class);

public static final Duration MAX_LSN_QUERY_DELAY = Duration.ZERO;
public static final Duration MAX_LSN_QUERY_DELAY_TEST = Duration.ofSeconds(1);
public final Lsn targetLsn;

public MssqlCdcTargetPosition(final Lsn targetLsn) {
Expand Down Expand Up @@ -77,17 +81,30 @@ public int hashCode() {

public static MssqlCdcTargetPosition getTargetPosition(final JdbcDatabase database, final String dbName) {
try {
final List<JsonNode> jsonNodes = database
.bufferedResultSetQuery(connection -> connection.createStatement().executeQuery(
"USE [" + dbName + "]; SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;"), JdbcUtils.getDefaultSourceOperations()::rowToJson);
// We might have to wait a bit before querying the max_lsn to give the CDC capture job
// a chance to catch up. This is important in tests, where reads might occur in quick succession
// which might leave the CT tables (which Debezium consumes) in a stale state.
final JsonNode sourceConfig = database.getSourceConfig();
final Duration delay = (sourceConfig != null && sourceConfig.has("is_test") && sourceConfig.get("is_test").asBoolean())
? MAX_LSN_QUERY_DELAY_TEST
: MAX_LSN_QUERY_DELAY;
final String maxLsnQuery = """
USE [%s];
WAITFOR DELAY '%02d:%02d:%02d';
SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;
""".formatted(dbName, delay.toHours(), delay.toMinutesPart(), delay.toSecondsPart());
// Query the high-water mark.
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
connection -> connection.createStatement().executeQuery(maxLsnQuery),
JdbcUtils.getDefaultSourceOperations()::rowToJson);
Preconditions.checkState(jsonNodes.size() == 1);
if (jsonNodes.get(0).get("max_lsn") != null) {
final Lsn maxLsn = Lsn.valueOf(jsonNodes.get(0).get("max_lsn").binaryValue());
LOGGER.info("identified target lsn: " + maxLsn);
return new MssqlCdcTargetPosition(maxLsn);
} else {
throw new RuntimeException("SQL returned max LSN as null, this might be because the SQL Server Agent is not running. " +
"Please enable the Agent and try again (https://docs.microsoft.com/en-us/sql/ssms/agent/start-stop-or-pause-the-sql-server-agent-service?view=sql-server-ver15)");
"Please enable the Agent and try again (https://docs.microsoft.com/en-us/sql/ssms/agent/start-stop-or-pause-the-sql-server-agent-service)");
}
} catch (final SQLException | IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) {
// enable cdc on tables for designated role
.with(enableCdcSqlFmt, SCHEMA_NAME, STREAM_NAME, CDC_ROLE_NAME)
.with(enableCdcSqlFmt, SCHEMA_NAME, STREAM_NAME2, CDC_ROLE_NAME)
.withShortenedCapturePollingInterval()
.withWaitUntilMaxLsnAvailable()
// revoke user permissions
.with("REVOKE ALL FROM %s CASCADE;", testdb.getUserName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ protected MsSQLTestDatabase createTestDatabase() {
.withConnectionProperty("databaseName", testdb.getDatabaseName())
.initialized()
.withSnapshotIsolation()
.withCdc()
.withWaitUntilAgentRunning();
.withWaitUntilAgentRunning()
.withCdc();
}

@Override
Expand Down Expand Up @@ -134,7 +134,8 @@ protected void setup() {
\t@supports_net_changes = 0""";
testdb
.with(enableCdcSqlFmt, modelsSchema(), MODELS_STREAM_NAME, CDC_ROLE_NAME)
.with(enableCdcSqlFmt, randomSchema(), RANDOM_TABLE_NAME, CDC_ROLE_NAME);
.with(enableCdcSqlFmt, randomSchema(), RANDOM_TABLE_NAME, CDC_ROLE_NAME)
.withShortenedCapturePollingInterval();

// Create a test user to be used by the source, with proper permissions.
testdb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void setup() {
testdb
.with("CREATE TABLE %s.test_table_%d (id INT IDENTITY(1,1) PRIMARY KEY);", TEST_SCHEMA, i)
.with(enableCdcSqlFmt, TEST_SCHEMA, i, CDC_ROLE_NAME, i, 1)
.withShortenedCapturePollingInterval()
.with("INSERT INTO %s.test_table_%d DEFAULT VALUES", TEST_SCHEMA, i);
}

Expand Down Expand Up @@ -122,7 +123,8 @@ public void setup() {
testdb
.with(sb.toString())
.with(enableCdcSqlFmt, TEST_SCHEMA, i, CDC_ROLE_NAME, i, 2)
.with(disableCdcSqlFmt, TEST_SCHEMA, i, i, 1);
.with(disableCdcSqlFmt, TEST_SCHEMA, i, i, 1)
.withShortenedCapturePollingInterval();
}
}

Expand Down Expand Up @@ -156,8 +158,16 @@ private JsonNode config() {
.with(JdbcUtils.USERNAME_KEY, testUserName())
.with(JdbcUtils.PASSWORD_KEY, testdb.getPassword())
.withSchemas(TEST_SCHEMA)
.withCdcReplication()
.withoutSsl()
// Configure for CDC replication but with a higher timeout than usual.
// This is because Debezium requires more time than usual to build the initial snapshot.
.with("is_test", true)
.with("replication_method", Map.of(
"method", "CDC",
"data_to_sync", "Existing and New",
"initial_waiting_seconds", 60,
"snapshot_isolation", "Snapshot"))

.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public MsSQLTestDatabase withWaitUntilAgentStopped() {
return self();
}

public MsSQLTestDatabase withShortenedCapturePollingInterval() {
return with("EXEC sys.sp_cdc_change_job @job_type = 'capture', @pollinginterval = %d;",
MssqlCdcTargetPosition.MAX_LSN_QUERY_DELAY_TEST.toSeconds());
}

private void waitForAgentState(final boolean running) {
final String expectedValue = running ? "Running." : "Stopped.";
LOGGER.debug("Waiting for SQLServerAgent state to change to '{}'.", expectedValue);
Expand Down Expand Up @@ -258,11 +263,12 @@ protected MsSQLConfigBuilder(MsSQLTestDatabase testDatabase) {
}

public MsSQLConfigBuilder withCdcReplication() {
return with("replication_method", Map.of(
"method", "CDC",
"data_to_sync", "Existing and New",
"initial_waiting_seconds", DEFAULT_CDC_REPLICATION_INITIAL_WAIT.getSeconds(),
"snapshot_isolation", "Snapshot"));
return with("is_test", true)
.with("replication_method", Map.of(
"method", "CDC",
"data_to_sync", "Existing and New",
"initial_waiting_seconds", DEFAULT_CDC_REPLICATION_INITIAL_WAIT.getSeconds(),
"snapshot_isolation", "Snapshot"));
}

public MsSQLConfigBuilder withSchemas(String... schemas) {
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.5.1 | 2024-01-05 | [33510](https://github.com/airbytehq/airbyte/pull/33510) | Test-only changes. |
| 3.5.0 | 2023-12-19 | [33071](https://github.com/airbytehq/airbyte/pull/33071) | Fix SSL configuration parameters |
| 3.4.1 | 2024-01-02 | [33755](https://github.com/airbytehq/airbyte/pull/33755) | Encode binary to base64 format |
| 3.4.0 | 2023-12-19 | [33481](https://github.com/airbytehq/airbyte/pull/33481) | Remove LEGACY state flag |
Expand Down

0 comments on commit c284588

Please sign in to comment.