Skip to content

Commit 4fe9323

Browse files
authored
[Improve][Connector-V2][Pulsar] Unified exception for Pulsar source &… (#3590)
* [Improve][Connector-V2][Pulsar] Unified exception for Pulsar source and sink
1 parent 4ae251e commit 4fe9323

File tree

12 files changed

+138
-26
lines changed

12 files changed

+138
-26
lines changed

docs/en/connector-v2/Error-Quick-Reference-Manual.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,4 +168,16 @@ problems encountered by users.
168168
| CLICKHOUSE-03 | Can’t delete directory | When users encounter this error code, it means that the directory does not exist or does not have permission, please check |
169169
| CLICKHOUSE-04 | Ssh operation failed, such as (login,connect,authentication,close) etc... | When users encounter this error code, it means that the ssh request failed, please check your network environment |
170170
| CLICKHOUSE-05 | Get cluster list from clickhouse failed | When users encounter this error code, it means that the clickhouse cluster is not configured correctly, please check |
171-
| CLICKHOUSE-06 | Shard key not found in table | When users encounter this error code, it means that the shard key of the distributed table is not configured, please check |
171+
| CLICKHOUSE-06 | Shard key not found in table | When users encounter this error code, it means that the shard key of the distributed table is not configured, please check |
172+
173+
## Pulsar Connector Error Codes
174+
175+
| code | description | solution |
176+
|--------------|--------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| PULSAR-01 | Open pulsar admin failed | When users encounter this error code, it means that open pulsar admin failed, please check it |
178+
| PULSAR-02 | Open pulsar client failed | When users encounter this error code, it means that open pulsar client failed, please check it |
179+
| PULSAR-03 | Pulsar authentication failed | When users encounter this error code, it means that Pulsar Authentication failed, please check it |
180+
| PULSAR-04 | Subscribe topic from pulsar failed | When users encounter this error code, it means that Subscribe topic from pulsar failed, please check it |
181+
| PULSAR-05 | Get last cursor of pulsar topic failed | When users encounter this error code, it means that get last cursor of pulsar topic failed, please check it |
182+
| PULSAR-06 | Get partition information of pulsar topic failed | When users encounter this error code, it means that Get partition information of pulsar topic failed, please check it |
183+

seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
1919

20+
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
21+
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
22+
2023
import org.apache.pulsar.client.admin.PulsarAdmin;
2124
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
2225
import org.apache.pulsar.client.api.Authentication;
@@ -43,7 +46,7 @@ public static PulsarAdmin createAdmin(PulsarAdminConfig config) {
4346
try {
4447
return builder.build();
4548
} catch (PulsarClientException e) {
46-
throw new RuntimeException(e);
49+
throw new PulsarConnectorException(PulsarConnectorErrorCode.OPEN_PULSAR_ADMIN_FAILED, e);
4750
}
4851
}
4952

@@ -54,7 +57,7 @@ public static PulsarClient createClient(PulsarClientConfig config) {
5457
try {
5558
return builder.build();
5659
} catch (PulsarClientException e) {
57-
throw new RuntimeException(e);
60+
throw new PulsarConnectorException(PulsarConnectorErrorCode.OPEN_PULSAR_CLIENT_FAILED, e);
5861
}
5962
}
6063

@@ -74,10 +77,10 @@ private static Authentication createAuthentication(BasePulsarConfig config) {
7477
try {
7578
return AuthenticationFactory.create(config.getAuthPluginClassName(), config.getAuthParams());
7679
} catch (PulsarClientException.UnsupportedAuthenticationException e) {
77-
throw new RuntimeException("Failed to create the authentication plug-in.", e);
80+
throw new PulsarConnectorException(PulsarConnectorErrorCode.PULSAR_AUTHENTICATION_FAILED, e);
7881
}
7982
} else {
80-
throw new IllegalArgumentException("Authentication parameters are required when using authentication plug-in.");
83+
throw new PulsarConnectorException(PulsarConnectorErrorCode.PULSAR_AUTHENTICATION_FAILED, "Authentication parameters are required when using authentication plug-in.");
8184
}
8285
}
8386
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.pulsar.exception;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
21+
22+
public enum PulsarConnectorErrorCode implements SeaTunnelErrorCode {
23+
24+
OPEN_PULSAR_ADMIN_FAILED("PULSAR-01", "Open pulsar admin failed"),
25+
OPEN_PULSAR_CLIENT_FAILED("PULSAR-02", "Open pulsar client failed"),
26+
PULSAR_AUTHENTICATION_FAILED("PULSAR-03", "Pulsar authentication failed"),
27+
SUBSCRIBE_TOPIC_FAILED("PULSAR-04", "Subscribe topic from pulsar failed"),
28+
GET_LAST_CURSOR_FAILED("PULSAR-05", "Get last cursor of pulsar topic failed"),
29+
GET_TOPIC_PARTITION_FAILED("PULSAR-06", "Get partition information of pulsar topic failed");
30+
31+
private final String code;
32+
private final String description;
33+
34+
PulsarConnectorErrorCode(String code, String description) {
35+
this.code = code;
36+
this.description = description;
37+
}
38+
39+
@Override
40+
public String getCode() {
41+
return code;
42+
}
43+
44+
@Override
45+
public String getDescription() {
46+
return description;
47+
}
48+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.pulsar.exception;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
21+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
22+
23+
public class PulsarConnectorException extends SeaTunnelRuntimeException {
24+
public PulsarConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
25+
super(seaTunnelErrorCode, errorMessage);
26+
}
27+
28+
public PulsarConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
29+
super(seaTunnelErrorCode, errorMessage, cause);
30+
}
31+
32+
public PulsarConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
33+
super(seaTunnelErrorCode, cause);
34+
}
35+
}

seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,12 @@
3333
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT;
3434
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME;
3535
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode;
36-
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode.LATEST;
37-
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StopMode.NEVER;
3836
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
3937
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL;
4038
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN;
4139

4240
import org.apache.seatunnel.api.common.PrepareFailException;
41+
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
4342
import org.apache.seatunnel.api.serialization.DeserializationSchema;
4443
import org.apache.seatunnel.api.source.Boundedness;
4544
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -56,6 +55,7 @@
5655
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
5756
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
5857
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties;
58+
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
5959
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumerator;
6060
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumeratorState;
6161
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
@@ -103,7 +103,7 @@ public String getPluginName() {
103103
public void prepare(Config config) throws PrepareFailException {
104104
CheckResult result = CheckConfigUtil.checkAllExists(config, SUBSCRIPTION_NAME.key(), CLIENT_SERVICE_URL.key(), ADMIN_SERVICE_URL.key());
105105
if (!result.isSuccess()) {
106-
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
106+
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, result.getMsg()));
107107
}
108108

109109
// admin config
@@ -155,7 +155,7 @@ public void prepare(Config config) throws PrepareFailException {
155155
if (partitionDiscoverer instanceof TopicPatternDiscoverer
156156
&& partitionDiscoveryIntervalMs > 0
157157
&& Boundedness.BOUNDED == stopCursor.getBoundedness()) {
158-
throw new IllegalArgumentException("Bounded streams do not support dynamic partition discovery.");
158+
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "Bounded streams do not support dynamic partition discovery.");
159159
}
160160
}
161161

@@ -177,12 +177,12 @@ private void setStartCursor(Config config) {
177177
break;
178178
case TIMESTAMP:
179179
if (StringUtils.isBlank(config.getString(CURSOR_STARTUP_TIMESTAMP.key()))) {
180-
throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", CURSOR_STARTUP_TIMESTAMP.key(), CURSOR_STARTUP_MODE.key()));
180+
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The '%s' property is required when the '%s' is 'timestamp'.", CURSOR_STARTUP_TIMESTAMP.key(), CURSOR_STARTUP_MODE.key()));
181181
}
182182
setOption(config, CURSOR_STARTUP_TIMESTAMP.key(), config::getLong, timestamp -> this.startCursor = StartCursor.timestamp(timestamp));
183183
break;
184184
default:
185-
throw new IllegalArgumentException(String.format("The %s mode is not supported.", startMode));
185+
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The %s mode is not supported.", startMode));
186186
}
187187
}
188188

@@ -197,12 +197,12 @@ private void setStopCursor(Config config) {
197197
break;
198198
case TIMESTAMP:
199199
if (StringUtils.isBlank(config.getString(CURSOR_STOP_TIMESTAMP.key()))) {
200-
throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", CURSOR_STOP_TIMESTAMP.key(), CURSOR_STOP_MODE.key()));
200+
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The '%s' property is required when the '%s' is 'timestamp'.", CURSOR_STOP_TIMESTAMP.key(), CURSOR_STOP_MODE.key()));
201201
}
202202
setOption(config, CURSOR_STARTUP_TIMESTAMP.key(), config::getLong, timestamp -> this.stopCursor = StopCursor.timestamp(timestamp));
203203
break;
204204
default:
205-
throw new IllegalArgumentException(String.format("The %s mode is not supported.", stopMode));
205+
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("The %s mode is not supported.", stopMode));
206206
}
207207
}
208208

@@ -214,12 +214,12 @@ private void setPartitionDiscoverer(Config config) {
214214
String topicPattern = config.getString(TOPIC_PATTERN.key());
215215
if (StringUtils.isNotBlank(topicPattern)) {
216216
if (this.partitionDiscoverer != null) {
217-
throw new IllegalArgumentException(String.format("The properties '%s' and '%s' is exclusive.", TOPIC.key(), TOPIC_PATTERN.key()));
217+
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' and '%s' is exclusive.", TOPIC.key(), TOPIC_PATTERN.key()));
218218
}
219219
this.partitionDiscoverer = new TopicPatternDiscoverer(Pattern.compile(topicPattern));
220220
}
221221
if (this.partitionDiscoverer == null) {
222-
throw new IllegalArgumentException(String.format("The properties '%s' or '%s' is required.", TOPIC.key(), TOPIC_PATTERN.key()));
222+
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' or '%s' is required.", TOPIC.key(), TOPIC_PATTERN.key()));
223223
}
224224
}
225225

seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
import org.apache.seatunnel.api.source.Boundedness;
2121
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
22+
import org.apache.seatunnel.common.exception.CommonErrorCode;
2223
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
2324
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
25+
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
2426
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
2527
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
2628
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.LatestMessageStopCursor;
@@ -107,10 +109,10 @@ public PulsarSplitEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit>
107109
StopCursor stopCursor,
108110
String subscriptionName,
109111
Set<TopicPartition> assignedPartitions) {
110-
if ((partitionDiscoverer instanceof TopicPatternDiscoverer)
112+
if (partitionDiscoverer instanceof TopicPatternDiscoverer
111113
&& partitionDiscoveryIntervalMs > 0
112114
&& Boundedness.BOUNDED == stopCursor.getBoundedness()) {
113-
throw new IllegalArgumentException("Bounded streams do not support dynamic partition discovery.");
115+
throw new PulsarConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, "Bounded streams do not support dynamic partition discovery.");
114116
}
115117
this.context = context;
116118
this.adminConfig = adminConfig;

seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start;
2020

21+
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
22+
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
2123
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
2224

2325
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -48,7 +50,7 @@ public void ensureSubscription(String subscription, TopicPartition partition, Pu
4850
}
4951
pulsarAdmin.topics().createSubscription(partition.getFullTopicName(), subscription, CursorResetStrategy.EARLIEST == cursorResetStrategy ? MessageId.earliest : MessageId.latest);
5052
} catch (PulsarAdminException e) {
51-
throw new RuntimeException(e);
53+
throw new PulsarConnectorException(PulsarConnectorErrorCode.OPEN_PULSAR_ADMIN_FAILED, e);
5254
}
5355
}
5456

seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop;
2020

21+
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
22+
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
2123
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
2224

2325
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -41,7 +43,7 @@ public void prepare(PulsarAdmin admin, TopicPartition partition) {
4143
try {
4244
messageId = admin.topics().getLastMessageId(topic);
4345
} catch (PulsarAdminException e) {
44-
throw new RuntimeException("Failed to get the last cursor", e);
46+
throw new PulsarConnectorException(PulsarConnectorErrorCode.GET_LAST_CURSOR_FAILED, "Failed to get the last cursor", e);
4547
}
4648
}
4749
}

seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicListDiscoverer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer;
2020

21+
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
22+
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
2123
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
2224

2325
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -53,7 +55,7 @@ public Set<TopicPartition> getSubscribedTopicPartitions(PulsarAdmin pulsarAdmin)
5355
return PulsarDiscoverer.toTopicPartitions(topicName, metadata.partitions);
5456
} catch (PulsarAdminException e) {
5557
// This method would cause the failure for subscriber.
56-
throw new IllegalStateException(e);
58+
throw new PulsarConnectorException(PulsarConnectorErrorCode.SUBSCRIBE_TOPIC_FAILED, e);
5759
}
5860
})
5961
.filter(Objects::nonNull)

0 commit comments

Comments
 (0)