Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ protected SnapshotResult doExecute(
LOG.info("Snapshot step 2 - Snapshotting data");
createDataEvents(ctx, snapshotSplit.getTableId());

final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
BinlogOffset highWatermark = determineHighWatermark(lowWatermark);

LOG.info(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,
Expand All @@ -162,6 +163,21 @@ protected SnapshotResult doExecute(
return SnapshotResult.completed(ctx.offset);
}

/**
* for chunk that equals to the whole table we do not need to normalize
* the snapshot data and the binlog data, just set high watermark to low watermark
* @return highWatermark
*/
private BinlogOffset determineHighWatermark(BinlogOffset lowWatermark) {
if (snapshotSplit.isWholeSplit()) {
LOG.info("for split {}, set highWatermark to lowWatermark {} since"
+ " it reads the whole table ", snapshotSplit, lowWatermark);
return lowWatermark;
} else {
return currentBinlogOffset(jdbcConnection);
}
}

@Override
protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
return new SnapshottingTask(false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,13 @@ public Collection<MySqlSnapshotSplit> generateSplits(TableId tableId) {
long start = System.currentTimeMillis();

Table table = mySqlSchema.getTableSchema(jdbc, tableId).getTable();
Column splitColumn = ChunkUtils.getSplitColumn(table);
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
}

List<ChunkRange> chunks = getChunks(tableId, jdbc, table);

// convert chunks into splits
List<MySqlSnapshotSplit> splits = new ArrayList<>();
RowType splitType = ChunkUtils.getSplitType(splitColumn);

RowType splitType = ChunkUtils.getSplitType(getSplitColumn(table));
for (int i = 0; i < chunks.size(); i++) {
ChunkRange chunk = chunks.get(i);
MySqlSnapshotSplit split =
Expand All @@ -151,6 +147,41 @@ public Collection<MySqlSnapshotSplit> generateSplits(TableId tableId) {
}
}

/**
* get the split column using primary key
* for those don't have primary key, return the first column
* @return chunks
*/
private Column getSplitColumn(Table table) {
if (table.primaryKeyColumns().isEmpty()) {
// since we do not need a split column when there is no primary key
// simply return the first column which won't be used
return table.columns().get(0);
} else {
return ChunkUtils.getSplitColumn(table);
}
}

/**
* get chunks of the table using primary key
* for those who don't have primary key, return the whole table as a chunk
* @return chunks
*/
private List<ChunkRange> getChunks(TableId tableId, JdbcConnection jdbc, Table table) {
if (table.primaryKeyColumns().isEmpty()) {
// take the whole table as chunk range
// when there is no primary key presented
return Collections.singletonList(ChunkRange.all());
} else {
Column splitColumn = ChunkUtils.getSplitColumn(table);
try {
return splitTableIntoChunks(jdbc, tableId, splitColumn);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
}
}
}

/**
* We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using
* evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ public boolean isSnapshotReadFinished() {
return highWatermark != null;
}

/**
* read the whole table when split start and split end are null
* @return whether the split reads the whole table
*/
public boolean isWholeSplit() {
return splitStart == null && splitEnd == null;
}

@Override
public Map<TableId, TableChange> getTableSchemas() {
return tableSchemas;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ public DynamicTableSource createDynamicTableSource(Context context) {
: config.get(ROW_KINDS_FILTERED);
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema);
validateStartupOptionIfEnableParallel(startupOptions);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private MySqlExtractNode buildAllMigrateExtractNode() {
return new MySqlExtractNode("1", "mysql_input", fields,
null, option, null,
tables, "localhost", "root", "inlong",
"test", null, null, false, null,
"test", null, null, true, null,
ExtractMode.CDC, null, null);
}

Expand Down