From 3b6f2ca8be4ce7c81c4441cb8899bed0be864408 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Thu, 14 Dec 2023 23:55:18 +0800 Subject: [PATCH 1/4] fix: batch mode offer event --- .../payload/evolvable/builder/PipeTransferBatchReqBuilder.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java index 9367ec2c18460..57d05a521ab64 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java @@ -114,8 +114,7 @@ public boolean onEvent(TabletInsertionEvent event) throws IOException, WALPipeEx final TPipeTransferReq req = buildTabletInsertionReq(event); final long requestCommitId = ((EnrichedEvent) event).getCommitId(); - if (requestCommitIds.isEmpty() - || !requestCommitIds.get(requestCommitIds.size() - 1).equals(requestCommitId)) { + if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) { reqs.add(req); events.add(event); requestCommitIds.add(requestCommitId); From 256e2f597b4fbe4851cc051b156f786f122f5845 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Fri, 15 Dec 2023 21:38:15 +0800 Subject: [PATCH 2/4] fix: remove batch mode offer event duplication check --- .../builder/PipeTransferBatchReqBuilder.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java index 57d05a521ab64..e555e85ab2572 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java @@ -101,7 +101,7 @@ protected PipeTransferBatchReqBuilder(PipeParameters parameters) { } /** - * Try offer event into cache if the given event is not duplicated. + * Try offer event into cache. * * @param event the given event * @return true if the batch can be transferred @@ -114,20 +114,18 @@ public boolean onEvent(TabletInsertionEvent event) throws IOException, WALPipeEx final TPipeTransferReq req = buildTabletInsertionReq(event); final long requestCommitId = ((EnrichedEvent) event).getCommitId(); - if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) { - reqs.add(req); - events.add(event); - requestCommitIds.add(requestCommitId); + reqs.add(req); + events.add(event); + requestCommitIds.add(requestCommitId); - ((EnrichedEvent) event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName()); + ((EnrichedEvent) event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName()); - if (firstEventProcessingTime == Long.MIN_VALUE) { - firstEventProcessingTime = System.currentTimeMillis(); - } - - bufferSize += req.getBody().length; + if (firstEventProcessingTime == Long.MIN_VALUE) { + firstEventProcessingTime = System.currentTimeMillis(); } + bufferSize += req.getBody().length; + return bufferSize >= getMaxBatchSizeInBytes() || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs; } From 2c8622aef36dc3f5278fd6cf74fabf8cf6ba0056 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 18 Dec 2023 23:35:09 +0800 Subject: [PATCH 3/4] Revert "fix: remove batch mode offer event duplication check" This reverts commit 256e2f597b4fbe4851cc051b156f786f122f5845. --- .../builder/PipeTransferBatchReqBuilder.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java index e555e85ab2572..57d05a521ab64 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java @@ -101,7 +101,7 @@ protected PipeTransferBatchReqBuilder(PipeParameters parameters) { } /** - * Try offer event into cache. + * Try offer event into cache if the given event is not duplicated. * * @param event the given event * @return true if the batch can be transferred @@ -114,17 +114,19 @@ public boolean onEvent(TabletInsertionEvent event) throws IOException, WALPipeEx final TPipeTransferReq req = buildTabletInsertionReq(event); final long requestCommitId = ((EnrichedEvent) event).getCommitId(); - reqs.add(req); - events.add(event); - requestCommitIds.add(requestCommitId); + if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) { + reqs.add(req); + events.add(event); + requestCommitIds.add(requestCommitId); - ((EnrichedEvent) event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName()); + ((EnrichedEvent) event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName()); - if (firstEventProcessingTime == Long.MIN_VALUE) { - firstEventProcessingTime = System.currentTimeMillis(); - } + if (firstEventProcessingTime == Long.MIN_VALUE) { + firstEventProcessingTime = System.currentTimeMillis(); + } - bufferSize += req.getBody().length; + bufferSize += req.getBody().length; + } return bufferSize >= getMaxBatchSizeInBytes() || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs; From 5e58979696779de99afc193ee153d7f53805d5b3 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 18 Dec 2023 23:38:12 +0800 Subject: [PATCH 4/4] chore: add comments --- .../payload/evolvable/builder/PipeTransferBatchReqBuilder.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java index 57d05a521ab64..a4e43f84516ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java @@ -114,6 +114,8 @@ public boolean onEvent(TabletInsertionEvent event) throws IOException, WALPipeEx final TPipeTransferReq req = buildTabletInsertionReq(event); final long requestCommitId = ((EnrichedEvent) event).getCommitId(); + // The deduplication logic here is to avoid the accumulation of the same event in a batch when + // retrying. if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) { reqs.add(req); events.add(event);