From f1b4f6c306ca7e6602a32cd5bbe16e6911adf031 Mon Sep 17 00:00:00 2001 From: liangyaohui <649262814@qq.com> Date: Tue, 14 Mar 2023 11:59:19 +0800 Subject: [PATCH 1/3] =?UTF-8?q?[MongoDB=20CDC]=20Fix=20=E2=80=9CpollAwaitT?= =?UTF-8?q?imeMillis=E2=80=9D=20does=20not=20take=20effect=20#1994?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mongodb/source/reader/fetch/MongoDBStreamFetchTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java index 8ce43c2c34..500294cc20 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java @@ -117,6 +117,7 @@ public void execute(Context context) throws Exception { SourceRecord changeRecord = null; if (!next.isPresent()) { long untilNext = nextUpdate - time.milliseconds(); + nextUpdate += sourceConfig.getPollAwaitTimeMillis(); if (untilNext > 0) { LOG.debug("Waiting {} ms to poll change records", untilNext); time.sleep(untilNext); From 35d4b4c5b9f7e75dff05e0dd2b76d6311373a2c8 Mon Sep 17 00:00:00 2001 From: liangyaohui <649262814@qq.com> Date: Fri, 17 Mar 2023 16:17:08 +0800 Subject: [PATCH 2/3] =?UTF-8?q?[MongoDB=20CDC]=20Fix=20=E2=80=9CpollAwaitT?= =?UTF-8?q?imeMillis=E2=80=9D=20does=20not=20take=20effect=20#1994?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mongodb/source/reader/fetch/MongoDBStreamFetchTask.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java index 500294cc20..0084780975 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java @@ -117,7 +117,6 @@ public void execute(Context context) throws Exception { SourceRecord changeRecord = null; if (!next.isPresent()) { long untilNext = nextUpdate - time.milliseconds(); - nextUpdate += sourceConfig.getPollAwaitTimeMillis(); if (untilNext > 0) { LOG.debug("Waiting {} ms to poll change records", untilNext); time.sleep(untilNext); @@ -131,6 +130,8 @@ public void execute(Context context) throws Exception { .map(this::normalizeHeartbeatRecord) .orElse(null); } + // update nextUpdateTime + nextUpdate += sourceConfig.getPollAwaitTimeMillis(); } else { BsonDocument changeStreamDocument = next.get(); MongoNamespace namespace = getMongoNamespace(changeStreamDocument); From 0a3058c7e08a717607c9e419f0f738b747ab2e25 Mon Sep 17 00:00:00 2001 From: Jiabao Sun Date: Fri, 24 Mar 2023 19:57:39 +0800 Subject: [PATCH 3/3] update next update time --- .../mongodb/source/reader/fetch/MongoDBStreamFetchTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java index 0084780975..1e20991fbd 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java @@ -131,7 +131,7 @@ public void execute(Context context) throws Exception { .orElse(null); } // update nextUpdateTime - nextUpdate += sourceConfig.getPollAwaitTimeMillis(); + nextUpdate = time.milliseconds() + sourceConfig.getPollAwaitTimeMillis(); } else { BsonDocument changeStreamDocument = next.get(); MongoNamespace namespace = getMongoNamespace(changeStreamDocument);