diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java index 50ba54bfd1..3849df66e1 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.base.source.reader.external; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -202,6 +203,11 @@ public void close() { } } + @VisibleForTesting + public ExecutorService getExecutorService() { + return executorService; + } + private void assertLowWatermark(SourceRecord lowWatermark) { checkState( isLowWatermarkEvent(lowWatermark), diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index edc0924d89..5040cd07b5 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -103,6 +103,7 @@ public void execute(Context context) throws Exception { sourceFetchContext.getDatabaseSchema(), sourceFetchContext.getConnection(), sourceFetchContext.getDispatcher(), + sourceFetchContext.getSnapshotReceiver(), split); SnapshotSplitChangeEventSourceContext changeEventSourceContext = new SnapshotSplitChangeEventSourceContext(); @@ -128,11 +129,15 @@ public void execute(Context context) throws Exception { } // execute redoLog read task if (snapshotResult.isCompletedOrSkipped()) { + final LogMinerOracleOffsetContextLoader loader = + new LogMinerOracleOffsetContextLoader(sourceFetchContext.getDbzConnectorConfig()); + final OracleOffsetContext streamOffsetContext = + loader.load(backfillBinlogSplit.getStartingOffset().getOffset()); + final RedoLogSplitReadTask backfillBinlogReadTask = createBackfillRedoLogReadTask(backfillBinlogSplit, sourceFetchContext); backfillBinlogReadTask.execute( - new SnapshotBinlogSplitChangeEventSourceContext(), - sourceFetchContext.getOffsetContext()); + new SnapshotBinlogSplitChangeEventSourceContext(), streamOffsetContext); } else { taskRunning = false; throw new IllegalStateException( @@ -210,6 +215,7 @@ public static class OracleSnapshotSplitReadTask extends AbstractSnapshotChangeEv private final SnapshotSplit snapshotSplit; private final OracleOffsetContext offsetContext; private final SnapshotProgressListener snapshotProgressListener; + private final EventDispatcher.SnapshotReceiver snapshotReceiver; public OracleSnapshotSplitReadTask( OracleConnectorConfig connectorConfig, @@ -218,6 +224,7 @@ public OracleSnapshotSplitReadTask( OracleDatabaseSchema databaseSchema, OracleConnection jdbcConnection, JdbcSourceEventDispatcher dispatcher, + EventDispatcher.SnapshotReceiver snapshotReceiver, SnapshotSplit snapshotSplit) { super(connectorConfig, snapshotProgressListener); this.offsetContext = previousOffset; @@ -228,6 +235,7 @@ public OracleSnapshotSplitReadTask( this.clock = Clock.SYSTEM; this.snapshotSplit = snapshotSplit; this.snapshotProgressListener = snapshotProgressListener; + this.snapshotReceiver = snapshotReceiver; } @Override @@ -280,7 +288,7 @@ protected SnapshotResult doExecute( "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, snapshotSplit); - ((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(lowWatermark); + ((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(highWatermark); dispatcher.dispatchWatermarkEvent( offsetContext.getPartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH); return SnapshotResult.completed(ctx.offset); @@ -309,8 +317,6 @@ private void createDataEvents( RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, TableId tableId) throws Exception { - EventDispatcher.SnapshotReceiver snapshotReceiver = - dispatcher.getSnapshotChangeEventReceiver(); LOG.debug("Snapshotting table {}", tableId); createDataEventsForTable( snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId)); diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java index 0051f3c212..5fa8ae71d1 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.oracle.source.reader.fetch; +import io.debezium.pipeline.EventDispatcher; import org.apache.flink.table.types.logical.RowType; import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; @@ -77,6 +78,7 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext { private JdbcSourceEventDispatcher dispatcher; private ChangeEventQueue queue; private OracleErrorHandler errorHandler; + private EventDispatcher.SnapshotReceiver snapshotReceiver; public OracleSourceFetchTaskContext( JdbcSourceConfig sourceConfig, @@ -133,6 +135,8 @@ public void configure(SourceSplitBase sourceSplitBase) { metadataProvider, schemaNameAdjuster); + this.snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); + final OracleChangeEventSourceMetricsFactory changeEventSourceMetricsFactory = new OracleChangeEventSourceMetricsFactory( new OracleStreamingChangeEventSourceMetrics( @@ -194,6 +198,10 @@ public JdbcSourceEventDispatcher getDispatcher() { return dispatcher; } + public EventDispatcher.SnapshotReceiver getSnapshotReceiver() { + return snapshotReceiver; + } + @Override public ChangeEventQueue getQueue() { return queue; diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/read/fetch/OracleScanFetchTaskTest.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/read/fetch/OracleScanFetchTaskTest.java new file mode 100644 index 0000000000..3f6a057ad1 --- /dev/null +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/read/fetch/OracleScanFetchTaskTest.java @@ -0,0 +1,248 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oracle.source.read.fetch; + +import com.ververica.cdc.connectors.oracle.source.OracleDialect; +import com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase; +import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig; +import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory; +import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask; +import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext; +import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils; +import com.ververica.cdc.connectors.oracle.utils.RecordsFormatter; +import io.debezium.connector.oracle.OracleConnection; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; + +import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; +import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect; +import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter; +import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit; +import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; +import com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher; +import io.debezium.data.Envelope; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.TableId; +import io.debezium.schema.DataCollectionSchema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link OracleScanFetchTask}. */ +public class OracleScanFetchTaskTest extends OracleSourceTestBase { + + @Test + public void testChangingDataInSnapshotScan() throws Exception { + OracleTestUtils.createAndInitialize( + OracleTestUtils.ORACLE_CONTAINER, "customer.sql"); + + String tableName = ORACLE_SCHEMA + ".customers"; + + OracleSourceConfigFactory sourceConfigFactory = + getConfigFactory( new String[] {tableName}, 10); + OracleSourceConfig sourceConfig = sourceConfigFactory.create(0); + OracleDialect oracleDialect = new OracleDialect(sourceConfigFactory); + + String tableId = ORACLE_DATABASE + "." + tableName; + String[] changingDataSql = + new String[] { + "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", + "DELETE FROM " + tableId + " where id = 102", + "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')", + "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103", + "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 110", + "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 111", + }; + + MakeChangeEventTaskContext makeChangeEventTaskContext = + new MakeChangeEventTaskContext( + sourceConfig, + oracleDialect, + createOracleConnection( + sourceConfig.getDbzConnectorConfig().getJdbcConfig()), + () -> executeSql(sourceConfig, changingDataSql)); + + final DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("address", DataTypes.STRING()), + DataTypes.FIELD("phone_number", DataTypes.STRING())); + List snapshotSplits = getSnapshotSplits(sourceConfig, oracleDialect); + + String[] expected = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Hangzhou, 123567891234]", + "+I[111, user_6, Hangzhou, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + }; + + List actual = + readTableSnapshotSplits(snapshotSplits, makeChangeEventTaskContext, 1, dataType); + assertEqualsInAnyOrder(Arrays.asList(expected), actual); + } + + private List readTableSnapshotSplits( + List snapshotSplits, + OracleSourceFetchTaskContext taskContext, + int scanSplitsNum, + DataType dataType) + throws Exception { + IncrementalSourceScanFetcher sourceScanFetcher = + new IncrementalSourceScanFetcher(taskContext, 0); + + List result = new ArrayList<>(); + for (int i = 0; i < scanSplitsNum; i++) { + SnapshotSplit sqlSplit = snapshotSplits.get(i); + if (sourceScanFetcher.isFinished()) { + sourceScanFetcher.submitTask( + taskContext.getDataSourceDialect().createFetchTask(sqlSplit)); + } + Iterator res; + while ((res = sourceScanFetcher.pollSplitRecords()) != null) { + while (res.hasNext()) { + SourceRecords sourceRecords = res.next(); + result.addAll(sourceRecords.getSourceRecordList()); + } + } + } + + sourceScanFetcher.close(); + + assertNotNull(sourceScanFetcher.getExecutorService()); + assertTrue(sourceScanFetcher.getExecutorService().isTerminated()); + + return formatResult(result, dataType); + } + + private List formatResult(List records, DataType dataType) { + final RecordsFormatter formatter = new RecordsFormatter(dataType); + return formatter.format(records); + } + + private List getSnapshotSplits( + OracleSourceConfig sourceConfig, JdbcDataSourceDialect sourceDialect) { + String databaseName = sourceConfig.getDatabaseList().get(0); + List tableIdList = + sourceConfig.getTableList().stream() + .map(tableId -> TableId.parse(databaseName + "." + tableId)) + .collect(Collectors.toList()); + final ChunkSplitter chunkSplitter = sourceDialect.createChunkSplitter(sourceConfig); + + List snapshotSplitList = new ArrayList<>(); + for (TableId table : tableIdList) { + Collection snapshotSplits = chunkSplitter.generateSplits(table); + snapshotSplitList.addAll(snapshotSplits); + } + return snapshotSplitList; + } + + public static OracleSourceConfigFactory getConfigFactory(String[] captureTables, int splitSize) { + Properties debeziumProperties = new Properties(); + debeziumProperties.setProperty("log.mining.strategy", "online_catalog"); + debeziumProperties.setProperty("log.mining.continuous.mine", "true"); + + return (OracleSourceConfigFactory) + new OracleSourceConfigFactory() + .hostname(ORACLE_CONTAINER.getHost()) + .port(ORACLE_CONTAINER.getOraclePort()) + .username(ORACLE_CONTAINER.getUsername()) + .password(ORACLE_CONTAINER.getPassword()) + .databaseList(ORACLE_DATABASE) + .tableList(captureTables) + .debeziumProperties(debeziumProperties) + .splitSize(splitSize); + } + + private boolean executeSql(OracleSourceConfig sourceConfig, String[] sqlStatements) { + JdbcConnection connection = + createOracleConnection(sourceConfig.getDbzConnectorConfig().getJdbcConfig()); + try { + connection.setAutoCommit(false); + connection.execute(sqlStatements); + connection.commit(); + } catch (SQLException e) { + LOG.error("Failed to execute sql statements.", e); + return false; + } + return true; + } + + class MakeChangeEventTaskContext extends OracleSourceFetchTaskContext { + + private Supplier makeChangeEventFunction; + + public MakeChangeEventTaskContext( + JdbcSourceConfig jdbcSourceConfig, + OracleDialect oracleDialect, + OracleConnection connection, + Supplier makeChangeEventFunction) { + super(jdbcSourceConfig, oracleDialect, connection); + this.makeChangeEventFunction = makeChangeEventFunction; + } + + @Override + public EventDispatcher.SnapshotReceiver getSnapshotReceiver() { + EventDispatcher.SnapshotReceiver snapshotReceiver = super.getSnapshotReceiver(); + return new EventDispatcher.SnapshotReceiver() { + + @Override + public void changeRecord( + DataCollectionSchema schema, + Envelope.Operation operation, + Object key, + Struct value, + OffsetContext offset, + ConnectHeaders headers) + throws InterruptedException { + snapshotReceiver.changeRecord(schema, operation, key, value, offset, headers); + } + + @Override + public void completeSnapshot() throws InterruptedException { + snapshotReceiver.completeSnapshot(); + // make change events + makeChangeEventFunction.get(); + Thread.sleep(10 * 1000); + } + }; + } + } +} diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/utils/RecordsFormatter.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/utils/RecordsFormatter.java new file mode 100644 index 0000000000..58fffd01a5 --- /dev/null +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/utils/RecordsFormatter.java @@ -0,0 +1,100 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oracle.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.RowRowConverter; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; + +import com.ververica.cdc.connectors.base.utils.SourceRecordUtils; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; +import org.apache.kafka.connect.source.SourceRecord; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** Formatter that formats the {@link org.apache.kafka.connect.source.SourceRecord} to String. */ +public class RecordsFormatter { + + private final DataType dataType; + private final ZoneId zoneId; + + private TypeInformation typeInfo; + private DebeziumDeserializationSchema deserializationSchema; + private SimpleCollector collector; + private RowRowConverter rowRowConverter; + + public RecordsFormatter(DataType dataType) { + this(dataType, ZoneId.of("UTC")); + } + + public RecordsFormatter(DataType dataType, ZoneId zoneId) { + this.dataType = dataType; + this.zoneId = zoneId; + this.typeInfo = + (TypeInformation) TypeConversions.fromDataTypeToLegacyInfo(dataType); + this.deserializationSchema = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType((RowType) dataType.getLogicalType()) + .setResultTypeInfo(typeInfo) + .build(); + this.collector = new SimpleCollector(); + this.rowRowConverter = RowRowConverter.create(dataType); + rowRowConverter.open(Thread.currentThread().getContextClassLoader()); + } + + public List format(List records) { + records.stream() + // Keep DataChangeEvent only + .filter(SourceRecordUtils::isDataChangeRecord) + .forEach( + r -> { + try { + deserializationSchema.deserialize(r, collector); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return collector.list.stream() + .map(rowRowConverter::toExternal) + .map(Row::toString) + .collect(Collectors.toList()); + } + + private static class SimpleCollector implements Collector { + + private List list = new ArrayList<>(); + + @Override + public void collect(RowData record) { + list.add(record); + } + + @Override + public void close() { + // do nothing + } + } +}