Skip to content

Commit 205f782

Browse files
[Improve][Connector-V2][Redis] Unified exception for redis source & sink exception (#3517)
* [Improve][Connector-V2][Redis] Unified exception for redis source & sink exception * [Improve][Connector-V2][Redis] Fix code style
1 parent 4e60418 commit 205f782

File tree

4 files changed

+57
-6
lines changed

4 files changed

+57
-6
lines changed

seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.redis.config;
1919

20+
import org.apache.seatunnel.common.exception.CommonErrorCode;
21+
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
22+
2023
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2124

2225
import lombok.Data;
@@ -88,7 +91,8 @@ public void buildWithConfig(Config config) {
8891
String dataType = config.getString(RedisConfig.DATA_TYPE.key());
8992
this.redisDataType = RedisDataType.valueOf(dataType.toUpperCase());
9093
} catch (IllegalArgumentException e) {
91-
throw new RuntimeException("Redis source connector only support these data types [key, hash, list, set, zset]", e);
94+
throw new RedisConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
95+
"Redis source connector only support these data types [key, hash, list, set, zset]", e);
9296
}
9397
}
9498

@@ -111,7 +115,8 @@ public Jedis buildJedis() {
111115
for (String redisNode : redisNodes) {
112116
String[] splits = redisNode.split(":");
113117
if (splits.length != 2) {
114-
throw new IllegalArgumentException("Invalid redis node information," +
118+
throw new RedisConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
119+
"Invalid redis node information," +
115120
"redis node information must like as the following: [host:port]");
116121
}
117122
HostAndPort hostAndPort = new HostAndPort(splits[0], Integer.parseInt(splits[1]));
@@ -130,7 +135,8 @@ public Jedis buildJedis() {
130135
return new JedisWrapper(jedisCluster);
131136
default:
132137
// do nothing
133-
throw new IllegalArgumentException("Not support this redis mode");
138+
throw new RedisConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
139+
"Not support this redis mode");
134140
}
135141
}
136142
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.redis.exception;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
21+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
22+
23+
public class RedisConnectorException extends SeaTunnelRuntimeException {
24+
public RedisConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
25+
super(seaTunnelErrorCode, errorMessage);
26+
}
27+
28+
public RedisConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
29+
super(seaTunnelErrorCode, errorMessage, cause);
30+
}
31+
32+
public RedisConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
33+
super(seaTunnelErrorCode, cause);
34+
}
35+
}

seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.redis.sink;
1919

2020
import org.apache.seatunnel.api.common.PrepareFailException;
21+
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2122
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2223
import org.apache.seatunnel.api.sink.SinkWriter;
2324
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -30,6 +31,7 @@
3031
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
3132
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
3233
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
34+
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
3335

3436
import org.apache.seatunnel.shade.com.typesafe.config.Config;
3537

@@ -53,7 +55,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
5355
this.pluginConfig = pluginConfig;
5456
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, RedisConfig.HOST.key(), RedisConfig.PORT.key(), RedisConfig.KEY.key(), RedisConfig.DATA_TYPE.key());
5557
if (!result.isSuccess()) {
56-
throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
58+
throw new RedisConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
59+
String.format("PluginName: %s, PluginType: %s, Message: %s",
60+
getPluginName(), PluginType.SINK, result.getMsg()));
5761
}
5862
this.redisParameters.buildWithConfig(pluginConfig);
5963
}

seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.redis.source;
1919

2020
import org.apache.seatunnel.api.common.PrepareFailException;
21+
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2122
import org.apache.seatunnel.api.serialization.DeserializationSchema;
2223
import org.apache.seatunnel.api.source.Boundedness;
2324
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -33,6 +34,7 @@
3334
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
3435
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
3536
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
37+
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
3638
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
3739

3840
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -54,14 +56,18 @@ public String getPluginName() {
5456
public void prepare(Config pluginConfig) throws PrepareFailException {
5557
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, RedisConfig.HOST.key(), RedisConfig.PORT.key(), RedisConfig.KEY_PATTERN.key(), RedisConfig.DATA_TYPE.key());
5658
if (!result.isSuccess()) {
57-
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
59+
throw new RedisConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
60+
String.format("PluginName: %s, PluginType: %s, Message: %s",
61+
getPluginName(), PluginType.SOURCE, result.getMsg()));
5862
}
5963
this.redisParameters.buildWithConfig(pluginConfig);
6064
// TODO: use format SPI
6165
// default use json format
6266
if (pluginConfig.hasPath(RedisConfig.FORMAT.key())) {
6367
if (!pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
64-
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Must config schema when format parameter been config");
68+
throw new RedisConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
69+
String.format("PluginName: %s, PluginType: %s, Message: %s",
70+
getPluginName(), PluginType.SOURCE, "Must config schema when format parameter been config"));
6571
}
6672
Config schema = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
6773

0 commit comments

Comments
 (0)