Skip to content

Commit d798cd8

Browse files
[Improve][Connector-V2][Http]Unified exception for http source & sink… (#3594)
* [Improve][Connector-V2][Http]Unified exception for http source & sink connector
1 parent e77fdbb commit d798cd8

File tree

3 files changed

+59
-3
lines changed

3 files changed

+59
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.http.exception;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
21+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
22+
23+
public class HttpConnectorException extends SeaTunnelRuntimeException {
24+
25+
private boolean reCreateLabel;
26+
27+
public HttpConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
28+
super(seaTunnelErrorCode, errorMessage);
29+
}
30+
31+
public HttpConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, boolean reCreateLabel) {
32+
super(seaTunnelErrorCode, errorMessage);
33+
this.reCreateLabel = reCreateLabel;
34+
}
35+
36+
public HttpConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
37+
super(seaTunnelErrorCode, errorMessage, cause);
38+
}
39+
40+
public HttpConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
41+
super(seaTunnelErrorCode, cause);
42+
}
43+
44+
public boolean needReCreateLabel() {
45+
return reCreateLabel;
46+
}
47+
}

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.http.sink;
1919

2020
import org.apache.seatunnel.api.common.PrepareFailException;
21+
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2122
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2223
import org.apache.seatunnel.api.sink.SinkWriter;
2324
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -30,6 +31,7 @@
3031
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
3132
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
3233
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
34+
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
3335

3436
import org.apache.seatunnel.shade.com.typesafe.config.Config;
3537

@@ -55,7 +57,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
5557
this.pluginConfig = pluginConfig;
5658
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HttpConfig.URL.key());
5759
if (!result.isSuccess()) {
58-
throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
60+
throw new HttpConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
61+
String.format("PluginName: %s, PluginType: %s, Message: %s",
62+
getPluginName(), PluginType.SINK, result.getMsg()));
5963
}
6064
httpParameter.setUrl(pluginConfig.getString(HttpConfig.URL.key()));
6165
if (pluginConfig.hasPath(HttpConfig.HEADERS.key())) {

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.api.common.JobContext;
2121
import org.apache.seatunnel.api.common.PrepareFailException;
22+
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2223
import org.apache.seatunnel.api.serialization.DeserializationSchema;
2324
import org.apache.seatunnel.api.source.Boundedness;
2425
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -29,12 +30,14 @@
2930
import org.apache.seatunnel.common.config.CheckResult;
3031
import org.apache.seatunnel.common.constants.JobMode;
3132
import org.apache.seatunnel.common.constants.PluginType;
33+
import org.apache.seatunnel.common.exception.CommonErrorCode;
3234
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
3335
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
3436
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
3537
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
3638
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
3739
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
40+
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
3841
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
3942

4043
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -64,7 +67,9 @@ public Boundedness getBoundedness() {
6467
public void prepare(Config pluginConfig) throws PrepareFailException {
6568
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HttpConfig.URL.key());
6669
if (!result.isSuccess()) {
67-
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
70+
throw new HttpConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
71+
String.format("PluginName: %s, PluginType: %s, Message: %s",
72+
getPluginName(), PluginType.SOURCE, result.getMsg()));
6873
}
6974
this.httpParameter.buildWithConfig(pluginConfig);
7075
buildSchemaWithConfig(pluginConfig);
@@ -86,7 +91,7 @@ protected void buildSchemaWithConfig(Config pluginConfig) {
8691
break;
8792
default:
8893
// TODO: use format SPI
89-
throw new UnsupportedOperationException("Unsupported format: " + format);
94+
throw new HttpConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, String.format("Unsupported data format [%s], http connector only support json format now", format));
9095
}
9196
} else {
9297
this.rowType = SeaTunnelSchema.buildSimpleTextSchema();

0 commit comments

Comments
 (0)