Skip to content

[Bug] [Mongo-CDC] NPE when enable heartbeat interval #10470

@HuG0012

Description

@HuG0012

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

In mongo cdc, I set config heartbeat.interval.ms to a non-zero value to enable heartbeats. And I got NPE.

    @Override
    public boolean isRecordBetween(
            SourceRecord record, @Nonnull Object[] splitStart, @Nonnull Object[] splitEnd) {
        BsonDocument documentKey = getDocumentKey(record);  // getDocumentKey returns null
        BsonDocument splitKeys = (BsonDocument) splitStart[0];
        String firstKey = splitKeys.getFirstKey();
        BsonValue keyValue = documentKey.get(firstKey);     // Here the documentKey is null
        BsonValue lowerBound = ((BsonDocument) splitStart[1]).get(firstKey);
        BsonValue upperBound = ((BsonDocument) splitEnd[1]).get(firstKey);

        if (isFullRange(lowerBound, upperBound)) {
            return true;
        }

        return isValueInRange(lowerBound, keyValue, upperBound);
    }
Image
    public static BsonDocument extractBsonDocument(
            Struct value, @Nonnull Schema valueSchema, String fieldName) {
        if (valueSchema.field(fieldName) != null) {      // fieldName is documentKey, and the schema of record has no documentKey. 
            String docString = value.getString(fieldName);
            if (docString != null) {
                return BsonDocument.parse(docString);
            }
        }
        return null;
    }

SeaTunnel Version

2.3.9

SeaTunnel Config

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MongoDB-CDC {
    hosts = "your_host:port"
    database = ["log_super"]
    collection = ["log_super.USER_TRACE_LOG"]
    username =
    password =
    heartbeat.interval.ms = 30000
    schema = {
      table = "log_super.USER_TRACE_LOG"
      fields {
        "_id" : string,
        "stepMessage" : string,
        "stepResult" : string
      }
    }
  }
}

# Console printing of the read Mongodb data
sink {
  Console {}
}

Running Command

none

Error Exception

[1073123856090136577] 2026-02-09 14:19:22,053 ERROR org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81) [classes/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_462]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_462]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_462]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_462]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_462]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_462]
Caused by: java.lang.NullPointerException
	at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch.MongodbFetchTaskContext.isRecordBetween(MongodbFetchTaskContext.java:161) ~[classes/:?]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.isChangeRecordInChunkRange(IncrementalSourceScanFetcher.java:258) ~[classes/:?]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecordsIfExactlyOnce(IncrementalSourceScanFetcher.java:185) ~[classes/:?]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:121) ~[classes/:?]
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:75) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ~[classes/:?]
	... 7 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions