Skip to content

Commit 9725d67

Browse files
lightzhaolightzhao
andauthored
[hotfix][pulsar] PulsarSource consumer ack exception. (#4237)
* fix pulsar source ack bug. * update doc. --------- Co-authored-by: lightzhao <zhaolianyong777@gmail.com>
1 parent 599772f commit 9725d67

File tree

4 files changed

+24
-15
lines changed

4 files changed

+24
-15
lines changed

docs/en/connector-v2/Error-Quick-Reference-Manual.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ problems encountered by users.
208208
| PULSAR-04 | Subscribe topic from pulsar failed | When users encounter this error code, it means that Subscribe topic from pulsar failed, please check it |
209209
| PULSAR-05 | Get last cursor of pulsar topic failed | When users encounter this error code, it means that get last cursor of pulsar topic failed, please check it |
210210
| PULSAR-06 | Get partition information of pulsar topic failed | When users encounter this error code, it means that Get partition information of pulsar topic failed, please check it |
211+
| PULSAR-07 | Pulsar consumer acknowledgeCumulative failed | When users encounter this error code, it means that Pulsar consumer acknowledgeCumulative failed |
211212

212213
## StarRocks Connector Error Codes
213214

seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public enum PulsarConnectorErrorCode implements SeaTunnelErrorCode {
2525
PULSAR_AUTHENTICATION_FAILED("PULSAR-03", "Pulsar authentication failed"),
2626
SUBSCRIBE_TOPIC_FAILED("PULSAR-04", "Subscribe topic from pulsar failed"),
2727
GET_LAST_CURSOR_FAILED("PULSAR-05", "Get last cursor of pulsar topic failed"),
28-
GET_TOPIC_PARTITION_FAILED("PULSAR-06", "Get partition information of pulsar topic failed");
28+
GET_TOPIC_PARTITION_FAILED("PULSAR-06", "Get partition information of pulsar topic failed"),
29+
ACK_CUMULATE_FAILED("PULSAR-07", "Pulsar consumer acknowledgeCumulative failed");
2930

3031
private final String code;
3132
private final String description;

seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
2626
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
2727
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
28+
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
2829
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
2930
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
3031
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
@@ -225,20 +226,26 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
225226
if (finishedSplits.contains(splitId)) {
226227
return;
227228
}
229+
try {
230+
splitReaders.get(splitId).committingCursor(messageId);
228231

229-
splitReaders.get(splitId).committingCursor(messageId);
230-
231-
if (pendingCursorsToFinish.containsKey(splitId)
232-
&& pendingCursorsToFinish.get(splitId).compareTo(messageId) == 0) {
233-
finishedSplits.add(splitId);
234-
try {
235-
splitReaders.get(splitId).close();
236-
} catch (IOException e) {
237-
throw new PulsarConnectorException(
238-
CommonErrorCode.READER_OPERATION_FAILED,
239-
"Failed to close the split reader thread.",
240-
e);
232+
if (pendingCursorsToFinish.containsKey(splitId)
233+
&& pendingCursorsToFinish.get(splitId).compareTo(messageId) == 0) {
234+
finishedSplits.add(splitId);
235+
try {
236+
splitReaders.get(splitId).close();
237+
} catch (IOException e) {
238+
throw new PulsarConnectorException(
239+
CommonErrorCode.READER_OPERATION_FAILED,
240+
"Failed to close the split reader thread.",
241+
e);
242+
}
241243
}
244+
} catch (PulsarClientException e) {
245+
throw new PulsarConnectorException(
246+
PulsarConnectorErrorCode.ACK_CUMULATE_FAILED,
247+
"pulsar consumer acknowledgeCumulative failed.",
248+
e);
242249
}
243250
});
244251
}

seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,11 @@ public void close() throws IOException {
118118
}
119119
}
120120

121-
public void committingCursor(MessageId offsetsToCommit) {
121+
public void committingCursor(MessageId offsetsToCommit) throws PulsarClientException {
122122
if (consumer == null) {
123123
consumer = createPulsarConsumer(split);
124124
}
125-
consumer.acknowledgeAsync(offsetsToCommit);
125+
consumer.acknowledgeCumulative(offsetsToCommit);
126126
}
127127

128128
/** Create a specified {@link Consumer} by the given split information. */

0 commit comments

Comments
 (0)