Skip to content

Commit b73944d

Browse files
FWLambyangbinbinHisoka-X
authored
[Improve][Connector-V2][ElasticSearch] Unified exception for ElasticSearch source & sink connector (#3569)
* unified exception * resolve confilcts * resolve confilcts * resolve confilcts Co-authored-by: yangbinbin <yangbinbin@aspirecn.com> Co-authored-by: Hisoka <fanjiaeminem@qq.com>
1 parent b2fda11 commit b73944d

File tree

13 files changed

+159
-165
lines changed

13 files changed

+159
-165
lines changed

docs/en/connector-v2/Error-Quick-Reference-Manual.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Error Quick Reference Manual
22

3-
This document records some common error codes and corresponding solutions of SeaTunnel, aiming to quickly solve the problems encountered by users.
3+
This document records some common error codes and corresponding solutions of SeaTunnel, aiming to quickly solve the
4+
problems encountered by users.
45

56
## SeaTunnel API Error Codes
67

@@ -96,6 +97,15 @@ This document records some common error codes and corresponding solutions of Sea
9697
| HIVE-02 | Initialize hive metastore client failed | When users encounter this error code, it means that connect to hive metastore service failed, please check it whether is work |
9798
| HIVE-03 | Get hive table information from hive metastore service failed | When users encounter this error code, it means that hive metastore service has some problems, please check it whether is work |
9899

100+
## Elasticsearch Connector Error Codes
101+
102+
| code | description | solution |
103+
|-------------------|-----------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------|
104+
| ELASTICSEARCH-01 | Bulk es response error | When the user encounters this error code, it means that the connection was aborted, please check it whether is work |
105+
| ELASTICSEARCH-02 | Get elasticsearch version failed | When the user encounters this error code, it means that the connection was aborted, please check it whether is work |
106+
| ELASTICSEARCH-03 | Fail to scroll request | When the user encounters this error code, it means that the connection was aborted, please check it whether is work |
107+
| ELASTICSEARCH-04 | Get elasticsearch document index count failed | When the user encounters this error code, it means that the es index may not wrong or the connection was aborted, please check |
108+
99109
## Kafka Connector Error Codes
100110

101111
| code | description | solution |

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@
2222
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
2323
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
2424
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
25-
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
26-
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
27-
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetIndexDocsCountException;
28-
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ScrollRequestException;
25+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
26+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
2927

3028
import org.apache.seatunnel.shade.com.typesafe.config.Config;
3129

@@ -96,9 +94,9 @@ private static RestClientBuilder getRestClientBuilder(List<String> hosts, String
9694
}
9795

9896
RestClientBuilder builder = RestClient.builder(httpHosts)
99-
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
100-
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
101-
.setSocketTimeout(SOCKET_TIMEOUT));
97+
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
98+
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
99+
.setSocketTimeout(SOCKET_TIMEOUT));
102100

103101
if (StringUtils.isNotEmpty(username)) {
104102
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
@@ -114,7 +112,7 @@ public BulkResponse bulk(String requestBody) {
114112
try {
115113
Response response = restClient.performRequest(request);
116114
if (response == null) {
117-
throw new BulkElasticsearchException("bulk es Response is null");
115+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR, "bulk es Response is null");
118116
}
119117
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
120118
ObjectMapper objectMapper = new ObjectMapper();
@@ -124,10 +122,10 @@ public BulkResponse bulk(String requestBody) {
124122
boolean errors = json.get("errors").asBoolean();
125123
return new BulkResponse(errors, took, entity);
126124
} else {
127-
throw new BulkElasticsearchException(String.format("bulk es response status code=%d,request boy=%s", response.getStatusLine().getStatusCode(), requestBody));
125+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR, String.format("bulk es response status code=%d,request boy=%s", response.getStatusLine().getStatusCode(), requestBody));
128126
}
129127
} catch (IOException e) {
130-
throw new BulkElasticsearchException(String.format("bulk es error,request boy=%s", requestBody), e);
128+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR, String.format("bulk es error,request boy=%s", requestBody), e);
131129
}
132130
}
133131

@@ -144,11 +142,11 @@ public String getClusterVersion() {
144142
JsonNode versionNode = jsonNode.get("version");
145143
return versionNode.get("number").asText();
146144
} catch (IOException e) {
147-
throw new GetElasticsearchVersionException("fail to get elasticsearch version.", e);
145+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_ES_VERSION_FAILED, "fail to get elasticsearch version.", e);
148146
}
149147
}
150148

151-
public void close() {
149+
public void close() {
152150
try {
153151
restClient.close();
154152
} catch (IOException e) {
@@ -199,7 +197,7 @@ private ScrollResult getDocsFromScrollRequest(String endpoint, String requestBod
199197
try {
200198
Response response = restClient.performRequest(request);
201199
if (response == null) {
202-
throw new ScrollRequestException("POST " + endpoint + " response null");
200+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR, "POST " + endpoint + " response null");
203201
}
204202
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
205203
String entity = EntityUtils.toString(response.getEntity());
@@ -213,10 +211,12 @@ private ScrollResult getDocsFromScrollRequest(String endpoint, String requestBod
213211
ScrollResult scrollResult = getDocsFromScrollResponse(responseJson);
214212
return scrollResult;
215213
} else {
216-
throw new ScrollRequestException(String.format("POST %s response status code=%d,request boy=%s", endpoint, response.getStatusLine().getStatusCode(), requestBody));
214+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR,
215+
String.format("POST %s response status code=%d,request boy=%s", endpoint, response.getStatusLine().getStatusCode(), requestBody));
217216
}
218217
} catch (IOException e) {
219-
throw new ScrollRequestException(String.format("POST %s error,request boy=%s", endpoint, requestBody), e);
218+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR,
219+
String.format("POST %s error,request boy=%s", endpoint, requestBody), e);
220220

221221
}
222222
}
@@ -240,7 +240,7 @@ private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) {
240240
for (Iterator<Map.Entry<String, JsonNode>> iterator = source.fields(); iterator.hasNext(); ) {
241241
Map.Entry<String, JsonNode> entry = iterator.next();
242242
String fieldName = entry.getKey();
243-
if (entry.getValue() instanceof TextNode){
243+
if (entry.getValue() instanceof TextNode) {
244244
doc.put(fieldName, entry.getValue().textValue());
245245
} else {
246246
doc.put(fieldName, entry.getValue());
@@ -257,17 +257,19 @@ public List<IndexDocsCount> getIndexDocsCount(String index) {
257257
try {
258258
Response response = restClient.performRequest(request);
259259
if (response == null) {
260-
throw new GetIndexDocsCountException("GET " + endpoint + " response null");
260+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
261+
"GET " + endpoint + " response null");
261262
}
262263
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
263264
String entity = EntityUtils.toString(response.getEntity());
264265
List<IndexDocsCount> indexDocsCounts = JsonUtils.toList(entity, IndexDocsCount.class);
265266
return indexDocsCounts;
266267
} else {
267-
throw new GetIndexDocsCountException(String.format("GET %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
268+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
269+
String.format("GET %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
268270
}
269271
} catch (IOException ex) {
270-
throw new GetIndexDocsCountException(ex);
272+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED, ex);
271273
}
272274
}
273275

@@ -284,10 +286,12 @@ public Map<String, String> getFieldTypeMapping(String index, List<String> source
284286
try {
285287
Response response = restClient.performRequest(request);
286288
if (response == null) {
287-
throw new GetIndexDocsCountException("GET " + endpoint + " response null");
289+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
290+
"GET " + endpoint + " response null");
288291
}
289292
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
290-
throw new GetIndexDocsCountException(String.format("GET %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
293+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
294+
String.format("GET %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
291295
}
292296
String entity = EntityUtils.toString(response.getEntity());
293297
log.info(String.format("GET %s respnse=%s", endpoint, entity));
@@ -308,7 +312,7 @@ public Map<String, String> getFieldTypeMapping(String index, List<String> source
308312

309313
}
310314
} catch (IOException ex) {
311-
throw new GetIndexDocsCountException(ex);
315+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED, ex);
312316
}
313317
return mapping;
314318
}

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
1919

20+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
21+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
22+
2023
public enum ElasticsearchVersion {
2124
ES2(2), ES5(5), ES6(6), ES7(7), ES8(8);
2225

@@ -40,7 +43,8 @@ public static ElasticsearchVersion get(int version) {
4043
return elasticsearchVersion;
4144
}
4245
}
43-
throw new IllegalArgumentException(String.format("version=%d,fail fo find ElasticsearchVersion.", version));
46+
throw new ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_ES_VERSION_FAILED,
47+
String.format("version=%d,fail fo find ElasticsearchVersion.", version));
4448
}
4549

4650
public static ElasticsearchVersion get(String clusterVersion) {

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java

Lines changed: 0 additions & 30 deletions
This file was deleted.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
21+
22+
public enum ElasticsearchConnectorErrorCode implements SeaTunnelErrorCode {
23+
24+
BULK_RESPONSE_ERROR("ELASTICSEARCH-01", "Bulk es response error"),
25+
GET_ES_VERSION_FAILED("ELASTICSEARCH-02", "Get elasticsearch version failed"),
26+
SCROLL_REQUEST_ERROR("ELASTICSEARCH-03", "Fail to scroll request"),
27+
GET_INDEX_DOCS_COUNT_FAILED("ELASTICSEARCH-04", "Get elasticsearch document index count failed");
28+
29+
private final String code;
30+
private final String description;
31+
32+
ElasticsearchConnectorErrorCode(String code, String description) {
33+
this.code = code;
34+
this.description = description;
35+
}
36+
37+
@Override
38+
public String getCode() {
39+
return code;
40+
}
41+
42+
@Override
43+
public String getDescription() {
44+
return description;
45+
}
46+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
21+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
22+
23+
public class ElasticsearchConnectorException extends SeaTunnelRuntimeException {
24+
public ElasticsearchConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
25+
super(seaTunnelErrorCode, errorMessage);
26+
}
27+
28+
public ElasticsearchConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
29+
super(seaTunnelErrorCode, errorMessage, cause);
30+
}
31+
32+
public ElasticsearchConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
33+
super(seaTunnelErrorCode, cause);
34+
}
35+
}

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java

Lines changed: 0 additions & 25 deletions
This file was deleted.

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

0 commit comments

Comments
 (0)