Skip to content

Commit 4686f35

Browse files
authored
[Improve][Connector-V2][Influxdb] Unified exception for influxdb source & sink connector (#3558)
* [Improve][Connector-V2][Influxdb] Unified exception for influxdb source & sink connector * [Improve][Connector-V2][Influxdb] Unified exception for influxdb source & sink connector
1 parent e1fc3d1 commit 4686f35

File tree

12 files changed

+247
-119
lines changed

12 files changed

+247
-119
lines changed

docs/en/connector-v2/Error-Quick-Reference-Manual.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,17 @@ This document records some common error codes and corresponding solutions of Sea
7676
| SOCKET-02 | Failed to send message to socket server | When the user encounters this error code, it means that there is a problem sending data and retry is not enabled, please check |
7777
| SOCKET-03 | Unable to write; interrupted while doing another attempt | When the user encounters this error code, it means that the data writing is interrupted abnormally, please check |
7878

79-
8079
## Hive Connector Error Codes
8180

8281
| code | description | solution |
8382
|---------|---------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
8483
| HIVE-01 | Get name node host from table location failed | When users encounter this error code, it means that the metastore inforamtion has some problems, please check it |
8584
| HIVE-02 | Initialize hive metastore client failed | When users encounter this error code, it means that connect to hive metastore service failed, please check it whether is work |
8685
| HIVE-03 | Get hive table information from hive metastore service failed | When users encounter this error code, it means that hive metastore service has some problems, please check it whether is work |
86+
87+
## InfluxDB Connector Error Codes
88+
89+
| code | description | solution |
90+
|-------------|------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
91+
| INFLUXDB-01 | Connect influxdb failed, due to influxdb version info is unknown | When the user encounters this error code, it indicates that the connection to influxdb failed. Please check |
92+
| INFLUXDB-02 | Get column index of query result exception | When the user encounters this error code, it indicates that obtaining the column index failed. Please check |

seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
2121
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
22+
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
23+
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
2224

2325
import lombok.extern.slf4j.Slf4j;
2426
import okhttp3.HttpUrl;
@@ -38,56 +40,56 @@
3840
public class InfluxDBClient {
3941
public static InfluxDB getInfluxDB(InfluxDBConfig config) throws ConnectException {
4042
OkHttpClient.Builder clientBuilder =
41-
new OkHttpClient.Builder()
42-
.connectTimeout(config.getConnectTimeOut(), TimeUnit.MILLISECONDS)
43-
.readTimeout(config.getQueryTimeOut(), TimeUnit.SECONDS);
43+
new OkHttpClient.Builder()
44+
.connectTimeout(config.getConnectTimeOut(), TimeUnit.MILLISECONDS)
45+
.readTimeout(config.getQueryTimeOut(), TimeUnit.SECONDS);
4446
InfluxDB.ResponseFormat format = InfluxDB.ResponseFormat.valueOf(config.getFormat());
4547
clientBuilder.addInterceptor(
46-
new Interceptor() {
47-
@Override
48-
public Response intercept(Chain chain) throws IOException {
49-
Request request = chain.request();
50-
HttpUrl httpUrl =
51-
request.url()
52-
.newBuilder()
53-
//set epoch
54-
.addQueryParameter("epoch", config.getEpoch())
55-
.build();
56-
Request build = request.newBuilder().url(httpUrl).build();
57-
Response response = chain.proceed(build);
58-
return response;
59-
}
60-
});
61-
InfluxDB influxDB =
62-
new InfluxDBImpl(
63-
config.getUrl(),
64-
StringUtils.isEmpty(config.getUsername()) ? StringUtils.EMPTY : config.getUsername(),
65-
StringUtils.isEmpty(config.getPassword()) ? StringUtils.EMPTY : config.getPassword(),
66-
clientBuilder,
67-
format);
68-
String version = influxDB.version();
69-
if (!influxDB.ping().isGood()) {
70-
String errorMessage =
71-
String.format(
72-
"connect influxdb failed, the url is: {%s}",
73-
config.getUrl());
74-
throw new ConnectException(errorMessage);
48+
new Interceptor() {
49+
@Override
50+
public Response intercept(Chain chain) throws IOException {
51+
Request request = chain.request();
52+
HttpUrl httpUrl =
53+
request.url()
54+
.newBuilder()
55+
//set epoch
56+
.addQueryParameter("epoch", config.getEpoch())
57+
.build();
58+
Request build = request.newBuilder().url(httpUrl).build();
59+
return chain.proceed(build);
60+
}
61+
});
62+
InfluxDB influxdb =
63+
new InfluxDBImpl(
64+
config.getUrl(),
65+
StringUtils.isEmpty(config.getUsername()) ? StringUtils.EMPTY : config.getUsername(),
66+
StringUtils.isEmpty(config.getPassword()) ? StringUtils.EMPTY : config.getPassword(),
67+
clientBuilder,
68+
format);
69+
String version = influxdb.version();
70+
if (!influxdb.ping().isGood()) {
71+
throw new InfluxdbConnectorException(InfluxdbConnectorErrorCode.CONNECT_FAILED,
72+
String.format(
73+
"Connect influxdb failed, the url is: {%s}",
74+
config.getUrl()
75+
)
76+
);
7577
}
7678
log.info("connect influxdb successful. sever version :{}.", version);
77-
return influxDB;
79+
return influxdb;
7880
}
7981

80-
public static void setWriteProperty(InfluxDB influxDB, SinkConfig sinkConfig) {
82+
public static void setWriteProperty(InfluxDB influxdb, SinkConfig sinkConfig) {
8183
String rp = sinkConfig.getRp();
8284
if (!StringUtils.isEmpty(rp)) {
83-
influxDB.setRetentionPolicy(rp);
85+
influxdb.setRetentionPolicy(rp);
8486
}
8587
}
8688

8789
public static InfluxDB getWriteClient(SinkConfig sinkConfig) throws ConnectException {
88-
InfluxDB influxDB = getInfluxDB(sinkConfig);
89-
influxDB.setDatabase(sinkConfig.getDatabase());
90+
InfluxDB influxdb = getInfluxDB(sinkConfig);
91+
influxdb.setDatabase(sinkConfig.getDatabase());
9092
setWriteProperty(getInfluxDB(sinkConfig), sinkConfig);
91-
return influxDB;
93+
return influxdb;
9294
}
9395
}

seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2323
import org.apache.seatunnel.api.table.type.SqlType;
24+
import org.apache.seatunnel.common.exception.CommonErrorCode;
25+
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
2426

2527
import java.util.ArrayList;
2628
import java.util.List;
@@ -39,8 +41,7 @@ public static SeaTunnelRow convert(List<Object> values, SeaTunnelRowType typeInf
3941
SqlType fieldSqlType = seaTunnelDataType.getSqlType();
4042
if (null == values.get(columnIndex)) {
4143
seaTunnelField = null;
42-
}
43-
else if (SqlType.BOOLEAN.equals(fieldSqlType)) {
44+
} else if (SqlType.BOOLEAN.equals(fieldSqlType)) {
4445
seaTunnelField = Boolean.parseBoolean(values.get(columnIndex).toString());
4546
} else if (SqlType.SMALLINT.equals(fieldSqlType)) {
4647
seaTunnelField = Short.valueOf(values.get(columnIndex).toString());
@@ -55,7 +56,8 @@ else if (SqlType.BOOLEAN.equals(fieldSqlType)) {
5556
} else if (SqlType.STRING.equals(fieldSqlType)) {
5657
seaTunnelField = values.get(columnIndex);
5758
} else {
58-
throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
59+
throw new InfluxdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
60+
"Unsupported data type: " + seaTunnelDataType);
5961
}
6062

6163
fields.add(seaTunnelField);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.influxdb.exception;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
21+
22+
public enum InfluxdbConnectorErrorCode implements SeaTunnelErrorCode {
23+
24+
CONNECT_FAILED("INFLUXDB-01", "Connect influxdb failed, due to influxdb version info is unknown"),
25+
GET_COLUMN_INDEX_FAILED("INFLUXDB-02", "Get column index of query result exception");
26+
27+
private final String code;
28+
private final String description;
29+
30+
InfluxdbConnectorErrorCode(String code, String description) {
31+
this.code = code;
32+
this.description = description;
33+
}
34+
35+
@Override
36+
public String getCode() {
37+
return this.code;
38+
}
39+
40+
@Override
41+
public String getDescription() {
42+
return this.description;
43+
}
44+
45+
@Override
46+
public String getErrorMessage() {
47+
return SeaTunnelErrorCode.super.getErrorMessage();
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.influxdb.exception;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
21+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
22+
23+
public class InfluxdbConnectorException extends SeaTunnelRuntimeException {
24+
25+
public InfluxdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
26+
super(seaTunnelErrorCode, errorMessage);
27+
}
28+
29+
public InfluxdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
30+
super(seaTunnelErrorCode, errorMessage, cause);
31+
}
32+
33+
public InfluxdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
34+
super(seaTunnelErrorCode, cause);
35+
}
36+
}

seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2121
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
23+
import org.apache.seatunnel.common.exception.CommonErrorCode;
24+
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
2325

2426
import com.google.common.base.Strings;
2527
import org.apache.commons.collections4.CollectionUtils;
@@ -35,14 +37,14 @@
3537
import java.util.stream.Stream;
3638

3739
public class DefaultSerializer implements Serializer {
38-
private SeaTunnelRowType seaTunnelRowType;
40+
private final SeaTunnelRowType seaTunnelRowType;
3941

4042
private final BiConsumer<SeaTunnelRow, Point.Builder> timestampExtractor;
4143
private final BiConsumer<SeaTunnelRow, Point.Builder> fieldExtractor;
4244
private final BiConsumer<SeaTunnelRow, Point.Builder> tagExtractor;
43-
private String measurement;
45+
private final String measurement;
4446

45-
private TimeUnit precision;
47+
private final TimeUnit precision;
4648

4749
public DefaultSerializer(SeaTunnelRowType seaTunnelRowType, TimeUnit precision, List<String> tagKeys,
4850
String timestampKey,
@@ -67,8 +69,7 @@ public Point serialize(SeaTunnelRow seaTunnelRow) {
6769

6870
private BiConsumer<SeaTunnelRow, Point.Builder> createFieldExtractor(SeaTunnelRowType seaTunnelRowType, List<String> fieldKeys) {
6971
return (row, builder) -> {
70-
for (int i = 0; i < fieldKeys.size(); i++) {
71-
String field = fieldKeys.get(i);
72+
for (String field : fieldKeys) {
7273
int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(field);
7374
SeaTunnelDataType dataType = seaTunnelRowType.getFieldType(indexOfSeaTunnelRow);
7475
Object val = row.getField(indexOfSeaTunnelRow);
@@ -96,14 +97,15 @@ private BiConsumer<SeaTunnelRow, Point.Builder> createFieldExtractor(SeaTunnelRo
9697
builder.addField(field, val.toString());
9798
break;
9899
default:
99-
throw new UnsupportedOperationException("Unsupported dataType: " + dataType);
100+
throw new InfluxdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
101+
"Unsupported data type: " + dataType);
100102
}
101103
}
102104
};
103105
}
104106

105107
private BiConsumer<SeaTunnelRow, Point.Builder> createTimestampExtractor(SeaTunnelRowType seaTunnelRowType,
106-
String timeKey) {
108+
String timeKey) {
107109
//not config timeKey, use processing time
108110
if (Strings.isNullOrEmpty(timeKey)) {
109111
return (row, builder) -> builder.time(System.currentTimeMillis(), precision);
@@ -121,7 +123,7 @@ private BiConsumer<SeaTunnelRow, Point.Builder> createTimestampExtractor(SeaTunn
121123
builder.time(Long.parseLong((String) time), precision);
122124
break;
123125
case TIMESTAMP:
124-
builder.time(LocalDateTime.class.cast(time)
126+
builder.time(((LocalDateTime) time)
125127
.atZone(ZoneOffset.UTC)
126128
.toInstant()
127129
.toEpochMilli(), precision);
@@ -136,27 +138,27 @@ private BiConsumer<SeaTunnelRow, Point.Builder> createTimestampExtractor(SeaTunn
136138
}
137139

138140
private BiConsumer<SeaTunnelRow, Point.Builder> createTagExtractor(SeaTunnelRowType seaTunnelRowType,
139-
List<String> tagKeys) {
141+
List<String> tagKeys) {
140142
//not config tagKeys
141143
if (CollectionUtils.isEmpty(tagKeys)) {
142-
return (row, builder) -> {};
144+
return (row, builder) -> {
145+
};
143146
}
144147

145148
return (row, builder) -> {
146-
for (int i = 0; i < tagKeys.size(); i++) {
147-
String tagKey = tagKeys.get(i);
149+
for (String tagKey : tagKeys) {
148150
int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(tagKey);
149151
builder.tag(tagKey, row.getField(indexOfSeaTunnelRow).toString());
150152
}
151153
};
152154
}
153155

154156
private List<String> getFieldKeys(SeaTunnelRowType seaTunnelRowType,
155-
String timestampKey,
156-
List<String> tagKeys) {
157+
String timestampKey,
158+
List<String> tagKeys) {
157159
return Stream.of(seaTunnelRowType.getFieldNames())
158-
.filter(name -> CollectionUtils.isEmpty(tagKeys) || !tagKeys.contains(name))
159-
.filter(name -> StringUtils.isEmpty(timestampKey) || !name.equals(timestampKey))
160-
.collect(Collectors.toList());
160+
.filter(name -> CollectionUtils.isEmpty(tagKeys) || !tagKeys.contains(name))
161+
.filter(name -> StringUtils.isEmpty(timestampKey) || !name.equals(timestampKey))
162+
.collect(Collectors.toList());
161163
}
162164
}

seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
2222

2323
import org.apache.seatunnel.api.common.PrepareFailException;
24+
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2425
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2526
import org.apache.seatunnel.api.sink.SinkWriter;
2627
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -31,6 +32,7 @@
3132
import org.apache.seatunnel.common.constants.PluginType;
3233
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
3334
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
35+
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
3436

3537
import org.apache.seatunnel.shade.com.typesafe.config.Config;
3638

@@ -53,7 +55,12 @@ public String getPluginName() {
5355
public void prepare(Config config) throws PrepareFailException {
5456
CheckResult result = CheckConfigUtil.checkAllExists(config, URL.key(), KEY_MEASUREMENT.key());
5557
if (!result.isSuccess()) {
56-
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
58+
throw new InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
59+
String.format("PluginName: %s, PluginType: %s, Message: %s",
60+
getPluginName(), PluginType.SINK,
61+
result.getMsg()
62+
)
63+
);
5764
}
5865
this.pluginConfig = config;
5966
}
@@ -64,12 +71,12 @@ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
6471
}
6572

6673
@Override
67-
public SeaTunnelDataType getConsumedType() {
74+
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
6875
return seaTunnelRowType;
6976
}
7077

7178
@Override
72-
public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException {
79+
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException {
7380
return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType);
7481
}
7582
}

0 commit comments

Comments
 (0)