Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -95,11 +96,11 @@ public abstract class JdbcIncrementalSourceReader extends AbstractCdcSourceReade
Fetcher<SourceRecords, SourceSplitBase>,
SnapshotSplitState>>
snapshotReaderContexts;
private Set<String> completedSplitIds = new HashSet<>();
private Set<String> completedSplitIds = ConcurrentHashMap.newKeySet();

// Parallel polling support
private ExecutorService pollExecutor;
private List<CompletableFuture<PollResult>> activePollFutures;
private volatile List<CompletableFuture<PollResult>> activePollFutures;

// Stream/binlog reader (single reader for stream split)
private Fetcher<SourceRecords, SourceSplitBase> streamReader;
Expand All @@ -109,7 +110,7 @@ public abstract class JdbcIncrementalSourceReader extends AbstractCdcSourceReade

public JdbcIncrementalSourceReader() {
this.serializer = new DebeziumJsonDeserializer();
this.snapshotReaderContexts = new ArrayList<>();
this.snapshotReaderContexts = new CopyOnWriteArrayList<>();
}

@Override
Expand Down Expand Up @@ -285,7 +286,7 @@ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) throw
}

/** Prepare snapshot splits (unified handling for single or multiple splits) */
private SplitReadResult prepareSnapshotSplits(
private synchronized SplitReadResult prepareSnapshotSplits(
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit> splits,
JobBaseRecordRequest baseReq)
throws Exception {
Expand Down Expand Up @@ -387,7 +388,7 @@ private SplitReadResult prepareSnapshotSplits(
}

/** Prepare stream split */
private SplitReadResult prepareStreamSplit(
private synchronized SplitReadResult prepareStreamSplit(
Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq) throws Exception {
// Load tableSchemas from FE if available (avoids re-discover on restart)
tryLoadTableSchemasFromRequest(baseReq);
Expand Down Expand Up @@ -505,7 +506,7 @@ private void startParallelPolling() {
LOG.info(
"Starting parallel polling for {} snapshot readers", snapshotReaderContexts.size());

activePollFutures = new ArrayList<>();
activePollFutures = new CopyOnWriteArrayList<>();

for (int i = 0; i < snapshotReaderContexts.size(); i++) {
final int index = i;
Expand Down Expand Up @@ -555,33 +556,30 @@ private void startParallelPolling() {
* data
*/
private PollResult waitForAnyCompletion() throws Exception {
while (!activePollFutures.isEmpty()) {
// Wait for any future to complete
List<CompletableFuture<PollResult>> snapshot = activePollFutures;
while (snapshot != null && !snapshot.isEmpty()) {
CompletableFuture<Object> anyOf =
CompletableFuture.anyOf(activePollFutures.toArray(new CompletableFuture[0]));
CompletableFuture.anyOf(snapshot.toArray(new CompletableFuture[0]));

anyOf.join(); // Wait for at least one to complete

// Find and process completed futures
Iterator<CompletableFuture<PollResult>> iterator = activePollFutures.iterator();
while (iterator.hasNext()) {
CompletableFuture<PollResult> future = iterator.next();

for (CompletableFuture<PollResult> future : snapshot) {
if (future.isDone()) {
Comment thread
JNSimba marked this conversation as resolved.
iterator.remove(); // Remove from active list
snapshot.remove(future);
PollResult result = future.get();
if (result != null) {
// Found a reader with data, return immediately
LOG.info(
"Got result from reader {}, {} futures remaining",
result.context.getSplit().splitId(),
activePollFutures.size());
snapshot.size());
completedSplitIds.add(result.context.getSplit().splitId());
return result;
}
// If result is null (no data), continue checking other futures
}
}
snapshot = activePollFutures;
}
// All futures completed but none had data
return null;
Expand Down Expand Up @@ -614,24 +612,30 @@ private static class PollResult {

/** Poll records from stream reader */
private Iterator<SourceRecord> pollRecordsFromStreamReader() throws InterruptedException {
Fetcher<SourceRecords, SourceSplitBase> reader = streamReader;
StreamSplit split = streamSplit;
StreamSplitState state = streamSplitState;
if (reader == null || split == null || state == null) {
LOG.info("Stream reader is null at poll start, returning empty");
return Collections.emptyIterator();
}

Preconditions.checkState(streamReader != null, "streamReader is null");
Preconditions.checkNotNull(streamSplitState, "streamSplitState is null");

Iterator<SourceRecords> dataIt = streamReader.pollSplitRecords();
Iterator<SourceRecords> dataIt = reader.pollSplitRecords();
if (dataIt == null || !dataIt.hasNext()) {
if (streamReader == null) {
LOG.info("Stream reader is null after poll, returning empty");
}
return Collections.emptyIterator();
}

SourceRecords sourceRecords = dataIt.next();
SplitRecords splitRecords =
new SplitRecords(streamSplit.splitId(), sourceRecords.iterator());
SplitRecords splitRecords = new SplitRecords(split.splitId(), sourceRecords.iterator());

if (!sourceRecords.getSourceRecordList().isEmpty()) {
LOG.info("{} Records received from stream", sourceRecords.getSourceRecordList().size());
}

return new FilteredRecordIterator(splitRecords, streamSplitState);
return new FilteredRecordIterator(splitRecords, state);
}

protected abstract DataType fromDbzColumn(Column splitColumn);
Expand Down Expand Up @@ -867,7 +871,7 @@ public boolean isSnapshotSplit(SourceSplit split) {
}

@Override
public void finishSplitRecords() {
public synchronized void finishSplitRecords() {
// Cancel any active poll operations
if (activePollFutures != null) {
activePollFutures.forEach(f -> f.cancel(true));
Expand Down Expand Up @@ -920,19 +924,9 @@ protected abstract Map<TableId, TableChanges.TableChange> discoverTableSchemas(
JobBaseConfig config);

@Override
public void close(JobBaseConfig jobConfig) {
public synchronized void close(JobBaseConfig jobConfig) {
LOG.info("Close source reader for job {}", jobConfig.getJobId());
Comment thread
JNSimba marked this conversation as resolved.

// Cancel any active poll operations
if (activePollFutures != null) {
activePollFutures.forEach(f -> f.cancel(true));
activePollFutures.clear();
activePollFutures = null;
}

// Clean up all readers
finishSplitRecords();

if (tableSchemas != null) {
tableSchemas.clear();
tableSchemas = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -88,6 +87,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -123,11 +123,11 @@ public class MySqlSourceReader extends AbstractCdcSourceReader {
SnapshotReaderContext<
MySqlSnapshotSplit, SnapshotSplitReader, MySqlSnapshotSplitState>>
snapshotReaderContexts;
private Set<String> completedSplitIds = new HashSet<>();
private Set<String> completedSplitIds = ConcurrentHashMap.newKeySet();

// Parallel polling support
private ExecutorService pollExecutor;
private List<CompletableFuture<PollResult>> activePollFutures;
private volatile List<CompletableFuture<PollResult>> activePollFutures;

// Binlog reader (single reader for binlog split)
private BinlogSplitReader binlogReader;
Expand All @@ -136,7 +136,7 @@ public class MySqlSourceReader extends AbstractCdcSourceReader {

public MySqlSourceReader() {
this.serializer = new MySqlDebeziumJsonDeserializer();
this.snapshotReaderContexts = new ArrayList<>();
this.snapshotReaderContexts = new CopyOnWriteArrayList<>();
}

@Override
Expand Down Expand Up @@ -341,7 +341,7 @@ private List<MySqlSnapshotSplit> extractSnapshotSplits(
}

/** Prepare snapshot splits (unified handling for single or multiple splits) */
private SplitReadResult prepareSnapshotSplits(
private synchronized SplitReadResult prepareSnapshotSplits(
List<MySqlSnapshotSplit> splits, JobBaseRecordRequest baseReq) throws Exception {

LOG.info("Preparing {} snapshot split(s) for reading", splits.size());
Expand Down Expand Up @@ -429,7 +429,7 @@ private SplitReadResult prepareSnapshotSplits(
}

/** Prepare binlog split */
private SplitReadResult prepareBinlogSplit(
private synchronized SplitReadResult prepareBinlogSplit(
Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq) throws Exception {
// Load tableSchemas from FE if available (avoids re-discover on restart)
tryLoadTableSchemasFromRequest(baseReq);
Comment thread
JNSimba marked this conversation as resolved.
Expand Down Expand Up @@ -527,7 +527,7 @@ private void startParallelPolling() {
LOG.info(
"Starting parallel polling for {} snapshot readers", snapshotReaderContexts.size());

activePollFutures = new ArrayList<>();
activePollFutures = new CopyOnWriteArrayList<>();

for (int i = 0; i < snapshotReaderContexts.size(); i++) {
final int index = i;
Expand Down Expand Up @@ -574,33 +574,30 @@ private void startParallelPolling() {
* data
*/
private PollResult waitForAnyCompletion() throws Exception {
while (!activePollFutures.isEmpty()) {
// Wait for any future to complete
List<CompletableFuture<PollResult>> snapshot = activePollFutures;
while (snapshot != null && !snapshot.isEmpty()) {
CompletableFuture<Object> anyOf =
CompletableFuture.anyOf(activePollFutures.toArray(new CompletableFuture[0]));
CompletableFuture.anyOf(snapshot.toArray(new CompletableFuture[0]));

anyOf.join(); // Wait for at least one to complete

// Find and process completed futures
Iterator<CompletableFuture<PollResult>> iterator = activePollFutures.iterator();
while (iterator.hasNext()) {
CompletableFuture<PollResult> future = iterator.next();

for (CompletableFuture<PollResult> future : snapshot) {
if (future.isDone()) {
Comment thread
JNSimba marked this conversation as resolved.
iterator.remove(); // Remove from active list
snapshot.remove(future);
PollResult result = future.get();
if (result != null) {
// Found a reader with data, return immediately
LOG.info(
"Got result from reader {}, {} futures remaining",
result.context.getSplit().splitId(),
activePollFutures.size());
snapshot.size());
completedSplitIds.add(result.context.getSplit().splitId());
return result;
}
// If result is null (no data), continue checking other futures
}
}
snapshot = activePollFutures;
}
// All futures completed but none had data
return null;
Expand Down Expand Up @@ -628,24 +625,30 @@ private static class PollResult {

/** Poll records from binlog reader */
private Iterator<SourceRecord> pollRecordsFromBinlogReader() throws InterruptedException {
BinlogSplitReader reader = binlogReader;
MySqlBinlogSplit split = binlogSplit;
MySqlBinlogSplitState state = binlogSplitState;
if (reader == null || split == null || state == null) {
LOG.info("Binlog reader is null at poll start, returning empty");
return Collections.emptyIterator();
}

Preconditions.checkState(binlogReader != null, "binlogReader is null");
Preconditions.checkNotNull(binlogSplitState, "binlogSplitState is null");

Iterator<SourceRecords> dataIt = binlogReader.pollSplitRecords();
Iterator<SourceRecords> dataIt = reader.pollSplitRecords();
if (dataIt == null || !dataIt.hasNext()) {
if (binlogReader == null) {
LOG.info("Binlog reader is null after poll, returning empty");
}
return Collections.emptyIterator();
}

SourceRecords sourceRecords = dataIt.next();
SplitRecords splitRecords =
new SplitRecords(binlogSplit.splitId(), sourceRecords.iterator());
SplitRecords splitRecords = new SplitRecords(split.splitId(), sourceRecords.iterator());

if (!sourceRecords.getSourceRecordList().isEmpty()) {
LOG.info("{} Records received from binlog", sourceRecords.getSourceRecordList().size());
}

return new FilteredRecordIterator(splitRecords, binlogSplitState);
return new FilteredRecordIterator(splitRecords, state);
}

/**
Expand Down Expand Up @@ -1024,7 +1027,7 @@ public boolean isSnapshotSplit(SourceSplit split) {
}

@Override
public void finishSplitRecords() {
public synchronized void finishSplitRecords() {

// Cancel any active poll operations
if (activePollFutures != null) {
Expand Down Expand Up @@ -1127,9 +1130,8 @@ private Map<TableId, TableChanges.TableChange> discoverTableSchemas(JobBaseConfi
}

@Override
public void close(JobBaseConfig jobConfig) {
public synchronized void close(JobBaseConfig jobConfig) {
LOG.info("Close source reader for job {}", jobConfig.getJobId());
Comment thread
JNSimba marked this conversation as resolved.

finishSplitRecords();
if (tableSchemas != null) {
tableSchemas.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_after_earliest_replay --
1 alice
2 bob

-- !select_after_earliest_incr --
1 alice_upd
3 charlie

Loading
Loading