Skip to content

Commit c422210

Browse files
authored
[Feature][Connector V2] expose configurable options in Kudu (#3365)
1 parent 26b0d3a commit c422210

File tree

5 files changed

+130
-15
lines changed

5 files changed

+130
-15
lines changed

seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.kudu.config;
1919

20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
2023
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2124

2225
import lombok.Data;
@@ -26,11 +29,25 @@
2629
@Data
2730
public class KuduSinkConfig {
2831

29-
private static final String KUDU_SAVE_MODE = "save_mode";
30-
private static final String KUDU_MASTER = "kudu_master";
31-
private static final String KUDU_TABLE_NAME = "kudu_table";
32+
public static final Option<String> KUDU_MASTER =
33+
Options.key("kudu_master")
34+
.stringType()
35+
.noDefaultValue()
36+
.withDescription("kudu master address");
37+
38+
public static final Option<SaveMode> KUDU_SAVE_MODE =
39+
Options.key("save_mode")
40+
.enumType(SaveMode.class)
41+
.noDefaultValue()
42+
.withDescription("Storage mode,append is now supported");
43+
44+
public static final Option<String> KUDU_TABLE_NAME =
45+
Options.key("kudu_table")
46+
.stringType()
47+
.noDefaultValue()
48+
.withDescription("kudu table name");
3249

33-
private SaveMode saveMode = SaveMode.APPEND;
50+
private SaveMode saveMode;
3451

3552
private String kuduMaster;
3653

@@ -53,10 +70,10 @@ public static SaveMode fromStr(String str) {
5370
}
5471

5572
public KuduSinkConfig(@NonNull Config pluginConfig) {
56-
if (pluginConfig.hasPath(KUDU_SAVE_MODE) && pluginConfig.hasPath(KUDU_MASTER) && pluginConfig.hasPath(KUDU_TABLE_NAME)) {
57-
this.saveMode = StringUtils.isBlank(pluginConfig.getString(KUDU_SAVE_MODE)) ? SaveMode.APPEND : SaveMode.fromStr(pluginConfig.getString(KUDU_SAVE_MODE));
58-
this.kuduMaster = pluginConfig.getString(KUDU_MASTER);
59-
this.kuduTableName = pluginConfig.getString(KUDU_TABLE_NAME);
73+
if (pluginConfig.hasPath(KUDU_SAVE_MODE.key()) && pluginConfig.hasPath(KUDU_MASTER.key()) && pluginConfig.hasPath(KUDU_TABLE_NAME.key())) {
74+
this.saveMode = StringUtils.isBlank(pluginConfig.getString(KUDU_SAVE_MODE.key())) ? SaveMode.APPEND : SaveMode.fromStr(pluginConfig.getString(KUDU_SAVE_MODE.key()));
75+
this.kuduMaster = pluginConfig.getString(KUDU_MASTER.key());
76+
this.kuduTableName = pluginConfig.getString(KUDU_TABLE_NAME.key());
6077
} else {
6178
throw new RuntimeException("Missing Sink configuration parameters");
6279
}

seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,29 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.kudu.config;
1919

20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
2023
import java.io.Serializable;
2124

2225
public class KuduSourceConfig implements Serializable {
2326

24-
public static final String KUDUMASTER = "kudu_master";
25-
public static final String TABLENAME = "kudu_table";
26-
public static final String COLUMNSLIST = "columnsList";
27+
public static final Option<String> KUDU_MASTER =
28+
Options.key("kudu_master")
29+
.stringType()
30+
.noDefaultValue()
31+
.withDescription("Kudu master address");
32+
33+
public static final Option<String> TABLE_NAME =
34+
Options.key("kudu_table")
35+
.stringType()
36+
.noDefaultValue()
37+
.withDescription("Kudu table name");
2738

39+
public static final Option<String> COLUMNS_LIST =
40+
Options.key("columnsList")
41+
.stringType()
42+
.noDefaultValue()
43+
.withDescription("Specifies the column names of the table");
2844

2945
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
19+
20+
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.table.factory.Factory;
22+
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
23+
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
24+
25+
import com.google.auto.service.AutoService;
26+
27+
@AutoService(Factory.class)
28+
public class KuduSinkFactory implements TableSinkFactory {
29+
@Override
30+
public String factoryIdentifier() {
31+
return "Kudu";
32+
}
33+
34+
@Override
35+
public OptionRule optionRule() {
36+
return OptionRule.builder()
37+
.required(KuduSinkConfig.KUDU_MASTER, KuduSinkConfig.KUDU_SAVE_MODE, KuduSinkConfig.KUDU_TABLE_NAME)
38+
.build();
39+
}
40+
}

seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,10 @@ public void prepare(Config config) {
103103
String kudumaster = "";
104104
String tableName = "";
105105
String columnslist = "";
106-
if (config.hasPath(KuduSourceConfig.KUDUMASTER) && config.hasPath(KuduSourceConfig.KUDUMASTER) && config.hasPath(KuduSourceConfig.KUDUMASTER)) {
107-
kudumaster = config.getString(KuduSourceConfig.KUDUMASTER);
108-
tableName = config.getString(KuduSourceConfig.TABLENAME);
109-
columnslist = config.getString(KuduSourceConfig.COLUMNSLIST);
106+
if (config.hasPath(KuduSourceConfig.KUDU_MASTER.key()) && config.hasPath(KuduSourceConfig.TABLE_NAME.key()) && config.hasPath(KuduSourceConfig.COLUMNS_LIST.key())) {
107+
kudumaster = config.getString(KuduSourceConfig.KUDU_MASTER.key());
108+
tableName = config.getString(KuduSourceConfig.TABLE_NAME.key());
109+
columnslist = config.getString(KuduSourceConfig.COLUMNS_LIST.key());
110110
kuduInputFormat = new KuduInputFormat(kudumaster, tableName, columnslist);
111111
} else {
112112
throw new RuntimeException("Missing Source configuration parameters");
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.kudu.source;
19+
20+
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.COLUMNS_LIST;
21+
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.KUDU_MASTER;
22+
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.TABLE_NAME;
23+
24+
import org.apache.seatunnel.api.configuration.util.OptionRule;
25+
import org.apache.seatunnel.api.table.factory.Factory;
26+
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
27+
28+
import com.google.auto.service.AutoService;
29+
30+
@AutoService(Factory.class)
31+
public class KuduSourceFactory implements TableSourceFactory {
32+
33+
@Override
34+
public String factoryIdentifier() {
35+
return "Kudu";
36+
}
37+
38+
@Override
39+
public OptionRule optionRule() {
40+
return OptionRule.builder().required(KUDU_MASTER, TABLE_NAME, COLUMNS_LIST).build();
41+
}
42+
}

0 commit comments

Comments
 (0)