Skip to content

Commit cd44b58

Browse files
authored
[Fix] [Connector] Rocketmq source startOffset greater than endOffset error (#6287)
1 parent 064dc29 commit cd44b58

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,10 @@ public void addSplitsBack(List<RocketMqSourceSplit> splits, int subtaskId) {
149149
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
150150
splits.forEach(
151151
split -> {
152-
split.setStartOffset(split.getEndOffset() + 1);
152+
split.setStartOffset(
153+
Math.min(
154+
split.getEndOffset() + 1,
155+
listOffsets.get(split.getMessageQueue())));
153156
split.setEndOffset(listOffsets.get(split.getMessageQueue()));
154157
});
155158
return splits.stream()

0 commit comments

Comments
 (0)