Skip to content

Commit 94b472b

Browse files
authored
[Improve][Connector-V2][Sentry] Unified exception for sentry sink connector (#3513)
1 parent 545595c commit 94b472b

File tree

6 files changed

+63
-40
lines changed

6 files changed

+63
-40
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
18+
package org.apache.seatunnel.connectors.seatunnel.sentry.config;
1919

2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.sentry.exception;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
21+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
22+
23+
public class SentryConnectorException extends SeaTunnelRuntimeException {
24+
25+
public SentryConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
26+
super(seaTunnelErrorCode, errorMessage);
27+
}
28+
29+
public SentryConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
30+
super(seaTunnelErrorCode, errorMessage, cause);
31+
}
32+
33+
public SentryConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
34+
super(seaTunnelErrorCode, cause);
35+
}
36+
}

seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818
package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
1919

2020
import org.apache.seatunnel.api.common.PrepareFailException;
21+
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2122
import org.apache.seatunnel.api.sink.SeaTunnelSink;
22-
import org.apache.seatunnel.api.sink.SinkWriter.Context;
23+
import org.apache.seatunnel.api.sink.SinkWriter;
2324
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2425
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2526
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2627
import org.apache.seatunnel.common.constants.PluginType;
2728
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
2829
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
30+
import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig;
31+
import org.apache.seatunnel.connectors.seatunnel.sentry.exception.SentryConnectorException;
2932

3033
import org.apache.seatunnel.shade.com.typesafe.config.Config;
3134

@@ -37,10 +40,11 @@
3740
* @description: SentrySink class
3841
*/
3942
@AutoService(SeaTunnelSink.class)
40-
public class SentrySink extends AbstractSimpleSink<SeaTunnelRow, SentrySinkState> {
43+
public class SentrySink extends AbstractSimpleSink<SeaTunnelRow, Void> {
4144

4245
private SeaTunnelRowType seaTunnelRowType;
4346
private Config pluginConfig;
47+
4448
@Override
4549
public String getPluginName() {
4650
return SentryConfig.SENTRY;
@@ -49,8 +53,13 @@ public String getPluginName() {
4953
@Override
5054
public void prepare(Config pluginConfig) throws PrepareFailException {
5155
if (!pluginConfig.hasPath(SentryConfig.DSN.key())) {
52-
throw new PrepareFailException(getPluginName(), PluginType.SINK,
53-
String.format("Config must include column : %s", SentryConfig.DSN));
56+
throw new SentryConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
57+
String.format("PluginName: %s, PluginType: %s, Message: %s",
58+
getPluginName(), PluginType.SINK,
59+
String.format("Config must include column : %s",
60+
SentryConfig.DSN)
61+
)
62+
);
5463
}
5564

5665
this.pluginConfig = pluginConfig;
@@ -67,7 +76,7 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
6776
}
6877

6978
@Override
70-
public AbstractSinkWriter<SeaTunnelRow, SentrySinkState> createWriter(Context context) throws IOException {
71-
return new SentrySinkWriter(seaTunnelRowType, context, pluginConfig);
79+
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException {
80+
return new SentrySinkWriter(seaTunnelRowType, pluginConfig);
7281
}
7382
}

seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
2121
import org.apache.seatunnel.api.table.factory.Factory;
2222
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
23+
import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig;
2324

2425
import com.google.auto.service.AutoService;
2526

seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkState.java

Lines changed: 0 additions & 23 deletions
This file was deleted.

seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
1919

20-
import org.apache.seatunnel.api.sink.SinkWriter;
2120
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2221
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2322
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
23+
import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig;
2424

2525
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2626

@@ -33,32 +33,32 @@
3333
* @description: SentrySinkWriter class
3434
*/
3535

36-
public class SentrySinkWriter extends AbstractSinkWriter<SeaTunnelRow, SentrySinkState> {
36+
public class SentrySinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
3737
private SeaTunnelRowType seaTunnelRowType;
38+
3839
public SentrySinkWriter(SeaTunnelRowType seaTunnelRowType,
39-
SinkWriter.Context context,
4040
Config pluginConfig) {
4141
SentryOptions options = new SentryOptions();
4242
options.setDsn(pluginConfig.getString(SentryConfig.DSN.key()));
43-
if (pluginConfig.hasPath(SentryConfig.ENV.key())){
43+
if (pluginConfig.hasPath(SentryConfig.ENV.key())) {
4444
options.setEnvironment(pluginConfig.getString(SentryConfig.ENV.key()));
4545
}
46-
if (pluginConfig.hasPath(SentryConfig.RELEASE.key())){
46+
if (pluginConfig.hasPath(SentryConfig.RELEASE.key())) {
4747
options.setRelease(pluginConfig.getString(SentryConfig.RELEASE.key()));
4848
}
49-
if (pluginConfig.hasPath(SentryConfig.CACHE_DIRPATH.key())){
49+
if (pluginConfig.hasPath(SentryConfig.CACHE_DIRPATH.key())) {
5050
options.setCacheDirPath(pluginConfig.getString(SentryConfig.CACHE_DIRPATH.key()));
5151
}
52-
if (pluginConfig.hasPath(SentryConfig.MAX_CACHEITEMS.key())){
52+
if (pluginConfig.hasPath(SentryConfig.MAX_CACHEITEMS.key())) {
5353
options.setMaxCacheItems(pluginConfig.getInt(SentryConfig.MAX_CACHEITEMS.key()));
5454
}
55-
if (pluginConfig.hasPath(SentryConfig.MAX_QUEUESIZE.key())){
55+
if (pluginConfig.hasPath(SentryConfig.MAX_QUEUESIZE.key())) {
5656
options.setMaxQueueSize(pluginConfig.getInt(SentryConfig.MAX_QUEUESIZE.key()));
5757
}
58-
if (pluginConfig.hasPath(SentryConfig.FLUSH_TIMEOUTMILLIS.key())){
58+
if (pluginConfig.hasPath(SentryConfig.FLUSH_TIMEOUTMILLIS.key())) {
5959
options.setFlushTimeoutMillis(pluginConfig.getLong(SentryConfig.FLUSH_TIMEOUTMILLIS.key()));
6060
}
61-
if (pluginConfig.hasPath(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key())){
61+
if (pluginConfig.hasPath(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key())) {
6262
options.setEnableExternalConfiguration(pluginConfig.getBoolean(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key()));
6363
}
6464
Sentry.init(options);

0 commit comments

Comments
 (0)