diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/offset/LsnFactory.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/offset/LsnFactory.java index 3d6993d68f..d711fae097 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/offset/LsnFactory.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/offset/LsnFactory.java @@ -27,7 +27,9 @@ public class LsnFactory extends OffsetFactory { @Override public Offset newOffset(Map offset) { - return new LsnOffset(Lsn.valueOf(offset.get(SourceInfo.CHANGE_LSN_KEY))); + Lsn changeLsn = Lsn.valueOf(offset.get(SourceInfo.CHANGE_LSN_KEY)); + Lsn commitLsn = Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY)); + return new LsnOffset(changeLsn, commitLsn, null); } @Override diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java index d53c7f94ac..1a901bbd83 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java @@ -52,6 +52,7 @@ public SqlServerStreamFetchTask(StreamSplit split) { public void execute(Context context) throws Exception { SqlServerSourceFetchTaskContext sourceFetchContext = (SqlServerSourceFetchTaskContext) context; + sourceFetchContext.getOffsetContext().preSnapshotCompletion(); taskRunning = true; redoLogSplitReadTask = new LsnSplitReadTask( diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java index 97b59ff239..eaaac08e36 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java @@ -192,14 +192,16 @@ public static LsnOffset getLsnPosition(Map offset) { offsetStrMap.put( entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString()); } - return new LsnOffset(Lsn.valueOf(offsetStrMap.get(SourceInfo.CHANGE_LSN_KEY))); + Lsn changeLsn = Lsn.valueOf(offsetStrMap.get(SourceInfo.CHANGE_LSN_KEY)); + Lsn commitLsn = Lsn.valueOf(offsetStrMap.get(SourceInfo.COMMIT_LSN_KEY)); + return new LsnOffset(changeLsn, commitLsn, null); } /** Fetch current largest log sequence number (LSN) of the database. */ public static LsnOffset currentLsn(SqlServerConnection connection) { try { Lsn maxLsn = connection.getMaxLsn(); - return new LsnOffset(maxLsn); + return new LsnOffset(maxLsn, maxLsn, null); } catch (SQLException e) { throw new FlinkRuntimeException(e.getMessage(), e); } diff --git a/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java b/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java index 6943941f55..3680066abc 100644 --- a/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java +++ b/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java @@ -27,6 +27,8 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.sql.Connection; import java.sql.SQLException; @@ -36,12 +38,14 @@ import java.util.List; import java.util.concurrent.ExecutionException; +import static org.apache.flink.api.common.JobStatus.RUNNING; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT; /** Integration tests for SqlServer Table source. */ +@RunWith(Parameterized.class) public class SqlServerConnectorITCase extends SqlServerTestBase { private final StreamExecutionEnvironment env = @@ -52,10 +56,28 @@ public class SqlServerConnectorITCase extends SqlServerTestBase { @ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE; + // enable the parallelismSnapshot (i.e: The new source OracleParallelSource) + private final boolean parallelismSnapshot; + + public SqlServerConnectorITCase(boolean parallelismSnapshot) { + this.parallelismSnapshot = parallelismSnapshot; + } + + @Parameterized.Parameters(name = "parallelismSnapshot: {0}") + public static Object[] parameters() { + return new Object[][] {new Object[] {false}, new Object[] {true}}; + } + @Before public void before() { TestValuesTableFactory.clearAllData(); - env.setParallelism(1); + + if (parallelismSnapshot) { + env.setParallelism(4); + env.enableCheckpointing(200); + } else { + env.setParallelism(1); + } } @Test @@ -75,6 +97,7 @@ public void testConsumingAllEvents() + " 'port' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'database-name' = '%s'," + " 'table-name' = '%s'" + ")", @@ -82,6 +105,7 @@ public void testConsumingAllEvents() MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT), MSSQL_SERVER_CONTAINER.getUsername(), MSSQL_SERVER_CONTAINER.getPassword(), + parallelismSnapshot, "inventory", "dbo.products"); String sinkDDL = @@ -160,6 +184,82 @@ public void testConsumingAllEvents() result.getJobClient().get().cancel().get(); } + @Test + public void testStartupFromLatestOffset() throws Exception { + initializeSqlServerTable("inventory"); + + Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement(); + + // The following two change records will be discarded in the 'latest-offset' mode + statement.execute( + "INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute( + "INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('scooter','Big 2-wheel scooter ',5.18);"); + Thread.sleep(5000L); + + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)" + + ") WITH (" + + " 'connector' = 'sqlserver-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.startup.mode' = 'latest-offset'" + + ")", + MSSQL_SERVER_CONTAINER.getHost(), + MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT), + MSSQL_SERVER_CONTAINER.getUsername(), + MSSQL_SERVER_CONTAINER.getPassword(), + parallelismSnapshot, + "inventory", + "dbo.products"); + String sinkDDL = + "CREATE TABLE sink " + + " WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ") LIKE debezium_source (EXCLUDING OPTIONS)"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); + + // wait for the source startup, we don't have a better way to wait it, use sleep for now + do { + Thread.sleep(5000L); + } while (result.getJobClient().get().getJobStatus().get() != RUNNING); + Thread.sleep(30000L); + + statement.execute( + "INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('hammer','18oz carpenters hammer',1.2);"); + statement.execute( + "INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('scooter','Big 3-wheel scooter',5.20);"); + + waitForSinkSize("sink", 2); + + String[] expected = + new String[] { + "112,hammer,18oz carpenters hammer,1.200", + "113,scooter,Big 3-wheel scooter,5.200" + }; + + List actual = TestValuesTableFactory.getResults("sink"); + assertThat(actual, containsInAnyOrder(expected)); + + result.getJobClient().get().cancel().get(); + } + @Test public void testAllTypes() throws Throwable { initializeSqlServerTable("column_type_test"); @@ -199,6 +299,7 @@ public void testAllTypes() throws Throwable { + " 'port' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'database-name' = '%s'," + " 'table-name' = '%s'" + ")", @@ -206,6 +307,7 @@ public void testAllTypes() throws Throwable { MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT), MSSQL_SERVER_CONTAINER.getUsername(), MSSQL_SERVER_CONTAINER.getPassword(), + parallelismSnapshot, "column_type_test", "dbo.full_types"); String sinkDDL = @@ -288,6 +390,7 @@ public void testMetadataColumns() throws Throwable { + " 'port' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'database-name' = '%s'," + " 'table-name' = '%s'" + ")", @@ -295,6 +398,7 @@ public void testMetadataColumns() throws Throwable { MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT), MSSQL_SERVER_CONTAINER.getUsername(), MSSQL_SERVER_CONTAINER.getPassword(), + parallelismSnapshot, "inventory", "dbo.products");