From 4cebf4ea19a42b0c8612d634f5f15314f525c9d8 Mon Sep 17 00:00:00 2001 From: stingpeng Date: Tue, 13 Dec 2022 14:14:35 +0800 Subject: [PATCH 1/3] [INLONG-6842][Sort] Improve mysql-cdc2.0 to support faster snapshot read for tables without primary key --- .../task/MySqlSnapshotSplitReadTask.java | 20 +++++++- .../mysql/source/assigners/ChunkSplitter.java | 51 ++++++++++++++++--- .../source/split/MySqlSnapshotSplit.java | 8 +++ .../table/MySqlTableInlongSourceFactory.java | 1 - 4 files changed, 70 insertions(+), 10 deletions(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index 28fb660d941..61257d33cd2 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -149,7 +149,8 @@ protected SnapshotResult doExecute( LOG.info("Snapshot step 2 - Snapshotting data"); createDataEvents(ctx, snapshotSplit.getTableId()); - final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection); + BinlogOffset highWatermark = determineHighWatermark(lowWatermark); + LOG.info( "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, @@ -162,6 +163,23 @@ protected SnapshotResult doExecute( return SnapshotResult.completed(ctx.offset); } + /** + * for chunk that equals to the whole table we do not need to normalize + * the snapshot data and the binlog data, just set high watermark to low watermark + * @return highWatermark + */ + private BinlogOffset determineHighWatermark(BinlogOffset lowWatermark) { + BinlogOffset highWatermark; + if (snapshotSplit.isWholeSplit()) { + highWatermark = lowWatermark; + LOG.info("for split {}, set highWatermark to lowWatermark {} since" + + " it reads the whole table ", snapshotSplit, lowWatermark); + } else { + highWatermark = currentBinlogOffset(jdbcConnection); + } + return highWatermark; + } + @Override protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) { return new SnapshottingTask(false, true); diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java index e4a6bd3e412..b47617aa353 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java @@ -114,17 +114,13 @@ public Collection generateSplits(TableId tableId) { long start = System.currentTimeMillis(); Table table = mySqlSchema.getTableSchema(jdbc, tableId).getTable(); - Column splitColumn = ChunkUtils.getSplitColumn(table); - final List chunks; - try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); - } catch (SQLException e) { - throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); - } + + List chunks = getChunks(tableId, jdbc, table); // convert chunks into splits List splits = new ArrayList<>(); - RowType splitType = ChunkUtils.getSplitType(splitColumn); + + RowType splitType = ChunkUtils.getSplitType(getSplitColumn(table)); for (int i = 0; i < chunks.size(); i++) { ChunkRange chunk = chunks.get(i); MySqlSnapshotSplit split = @@ -151,6 +147,45 @@ public Collection generateSplits(TableId tableId) { } } + /** + * get the split column using primary key + * for those don't have primary key, return the first column + * @return chunks + */ + private Column getSplitColumn(Table table) { + Column splitColumn; + if (table.primaryKeyColumns().isEmpty()) { + // since we do not need a split column when there is no primary key + // simply return the first column which won't be used + splitColumn = table.columns().get(0); + } else { + splitColumn = ChunkUtils.getSplitColumn(table); + } + return splitColumn; + } + + /** + * get chunks of the table using primary key + * for those who don't have primary key, return the whole table as a chunk + * @return chunks + */ + private List getChunks(TableId tableId, JdbcConnection jdbc, Table table) { + List chunks = new ArrayList<>(); + if (table.primaryKeyColumns().isEmpty()) { + // take the whole table as chunk range + // when there is no primary key presented + chunks.add(ChunkRange.all()); + } else { + Column splitColumn = ChunkUtils.getSplitColumn(table); + try { + chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); + } catch (SQLException e) { + throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); + } + } + return chunks; + } + /** * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java index 1e31300041d..f756550fb71 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java @@ -91,6 +91,14 @@ public boolean isSnapshotReadFinished() { return highWatermark != null; } + /** + * read the whole table when split start and split end are null + * @return whether the split reads the whole table + */ + public boolean isWholeSplit() { + return splitStart == null && splitEnd == null; + } + @Override public Map getTableSchemas() { return tableSchemas; diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java index 01200c95818..682857d2574 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java @@ -154,7 +154,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { : config.get(ROW_KINDS_FILTERED); boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); if (enableParallelRead) { - validatePrimaryKeyIfEnableParallel(physicalSchema); validateStartupOptionIfEnableParallel(startupOptions); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); From 7140e9af41e2bec503b14601cf4ba2a645e09b50 Mon Sep 17 00:00:00 2001 From: stingpeng Date: Tue, 13 Dec 2022 14:41:56 +0800 Subject: [PATCH 2/3] [INLONG-6842][Sort] add test for all migrate without primary key --- .../test/java/org/apache/inlong/sort/parser/AllMigrateTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java index fc779f701f7..a527305787d 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java @@ -62,7 +62,7 @@ private MySqlExtractNode buildAllMigrateExtractNode() { return new MySqlExtractNode("1", "mysql_input", fields, null, option, null, tables, "localhost", "root", "inlong", - "test", null, null, false, null, + "test", null, null, true, null, ExtractMode.CDC, null, null); } From 7b50aad2effb3caef255a491b55ee4374bfa6441 Mon Sep 17 00:00:00 2001 From: stingpeng Date: Tue, 13 Dec 2022 15:05:46 +0800 Subject: [PATCH 3/3] [INLONG-6842][Sort] optimize code --- .../debezium/task/MySqlSnapshotSplitReadTask.java | 6 ++---- .../cdc/mysql/source/assigners/ChunkSplitter.java | 12 ++++-------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index 61257d33cd2..6a37a0f18eb 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -169,15 +169,13 @@ protected SnapshotResult doExecute( * @return highWatermark */ private BinlogOffset determineHighWatermark(BinlogOffset lowWatermark) { - BinlogOffset highWatermark; if (snapshotSplit.isWholeSplit()) { - highWatermark = lowWatermark; LOG.info("for split {}, set highWatermark to lowWatermark {} since" + " it reads the whole table ", snapshotSplit, lowWatermark); + return lowWatermark; } else { - highWatermark = currentBinlogOffset(jdbcConnection); + return currentBinlogOffset(jdbcConnection); } - return highWatermark; } @Override diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java index b47617aa353..26494f3cf40 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java @@ -153,15 +153,13 @@ public Collection generateSplits(TableId tableId) { * @return chunks */ private Column getSplitColumn(Table table) { - Column splitColumn; if (table.primaryKeyColumns().isEmpty()) { // since we do not need a split column when there is no primary key // simply return the first column which won't be used - splitColumn = table.columns().get(0); + return table.columns().get(0); } else { - splitColumn = ChunkUtils.getSplitColumn(table); + return ChunkUtils.getSplitColumn(table); } - return splitColumn; } /** @@ -170,20 +168,18 @@ private Column getSplitColumn(Table table) { * @return chunks */ private List getChunks(TableId tableId, JdbcConnection jdbc, Table table) { - List chunks = new ArrayList<>(); if (table.primaryKeyColumns().isEmpty()) { // take the whole table as chunk range // when there is no primary key presented - chunks.add(ChunkRange.all()); + return Collections.singletonList(ChunkRange.all()); } else { Column splitColumn = ChunkUtils.getSplitColumn(table); try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); + return splitTableIntoChunks(jdbc, tableId, splitColumn); } catch (SQLException e) { throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); } } - return chunks; } /**