Skip to content

Commit 349f142

Browse files
authored
[Improve] easysearch options (#8951)
1 parent 710044e commit 349f142

File tree

15 files changed

+228
-287
lines changed

15 files changed

+228
-287
lines changed

seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,13 +183,11 @@ private Set<String> buildWhiteList() {
183183
whiteList.add("PulsarSourceOptions");
184184
whiteList.add("MongodbSinkOptions");
185185
whiteList.add("IoTDBSinkOptions");
186-
whiteList.add("EasysearchSourceOptions");
187186
whiteList.add("IcebergSourceOptions");
188187
whiteList.add("PaimonSourceOptions");
189188
whiteList.add("IoTDBSourceOptions");
190189
whiteList.add("SlsSourceOptions");
191190
whiteList.add("SentrySinkOptions");
192-
whiteList.add("EasysearchSinkOptions");
193191
whiteList.add("QdrantSinkOptions");
194192
whiteList.add("MilvusSourceOptions");
195193
whiteList.add("RocketMqSinkOptions");

seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalog.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.catalog;
1919

2020
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
21-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2221

22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2323
import org.apache.seatunnel.api.configuration.util.ConfigUtil;
2424
import org.apache.seatunnel.api.table.catalog.Catalog;
2525
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -57,12 +57,13 @@ public class EasysearchCatalog implements Catalog {
5757

5858
private final String catalogName;
5959
private final String defaultDatabase;
60-
private final Config pluginConfig;
60+
private final ReadonlyConfig pluginConfig;
6161

6262
private EasysearchClient ezsClient;
6363

6464
// todo: do we need default database?
65-
public EasysearchCatalog(String catalogName, String defaultDatabase, Config easySearchConfig) {
65+
public EasysearchCatalog(
66+
String catalogName, String defaultDatabase, ReadonlyConfig easySearchConfig) {
6667
this.catalogName = checkNotNull(catalogName, "catalogName cannot be null");
6768
this.defaultDatabase = defaultDatabase;
6869
this.pluginConfig = checkNotNull(easySearchConfig, "easySearchConfig cannot be null");

seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java

Lines changed: 18 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
2222
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
2323
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
24-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2524

25+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2626
import org.apache.seatunnel.common.utils.JsonUtils;
27-
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig;
27+
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSinkCommonOptions;
2828
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.BulkResponse;
2929
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.EasysearchClusterInfo;
3030
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.IndexDocsCount;
@@ -78,61 +78,23 @@ private EasysearchClient(RestClient restClient) {
7878
this.restClient = restClient;
7979
}
8080

81-
public static EasysearchClient createInstance(Config pluginConfig) {
82-
List<String> hosts = pluginConfig.getStringList(EzsClusterConnectionConfig.HOSTS.key());
83-
Optional<String> username = Optional.empty();
84-
Optional<String> password = Optional.empty();
85-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.USERNAME.key())) {
86-
username =
87-
Optional.of(pluginConfig.getString(EzsClusterConnectionConfig.USERNAME.key()));
88-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.PASSWORD.key())) {
89-
password =
90-
Optional.of(
91-
pluginConfig.getString(EzsClusterConnectionConfig.PASSWORD.key()));
92-
}
93-
}
94-
Optional<String> keystorePath = Optional.empty();
95-
Optional<String> keystorePassword = Optional.empty();
96-
Optional<String> truststorePath = Optional.empty();
97-
Optional<String> truststorePassword = Optional.empty();
81+
public static EasysearchClient createInstance(ReadonlyConfig pluginConfig) {
82+
List<String> hosts = pluginConfig.get(EasysearchSinkCommonOptions.HOSTS);
83+
Optional<String> username = pluginConfig.getOptional(EasysearchSinkCommonOptions.USERNAME);
84+
Optional<String> password = pluginConfig.getOptional(EasysearchSinkCommonOptions.PASSWORD);
85+
Optional<String> keystorePath =
86+
pluginConfig.getOptional(EasysearchSinkCommonOptions.TLS_KEY_STORE_PATH);
87+
Optional<String> keystorePassword =
88+
pluginConfig.getOptional(EasysearchSinkCommonOptions.TLS_KEY_STORE_PASSWORD);
89+
Optional<String> truststorePath =
90+
pluginConfig.getOptional(EasysearchSinkCommonOptions.TLS_TRUST_STORE_PATH);
91+
Optional<String> truststorePassword =
92+
pluginConfig.getOptional(EasysearchSinkCommonOptions.TLS_TRUST_STORE_PASSWORD);
9893
boolean tlsVerifyCertificate =
99-
EzsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.defaultValue();
100-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.key())) {
101-
tlsVerifyCertificate =
102-
pluginConfig.getBoolean(
103-
EzsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.key());
104-
}
105-
if (tlsVerifyCertificate) {
106-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.TLS_KEY_STORE_PATH.key())) {
107-
keystorePath =
108-
Optional.of(
109-
pluginConfig.getString(
110-
EzsClusterConnectionConfig.TLS_KEY_STORE_PATH.key()));
111-
}
112-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD.key())) {
113-
keystorePassword =
114-
Optional.of(
115-
pluginConfig.getString(
116-
EzsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD.key()));
117-
}
118-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.TLS_TRUST_STORE_PATH.key())) {
119-
truststorePath =
120-
Optional.of(
121-
pluginConfig.getString(
122-
EzsClusterConnectionConfig.TLS_TRUST_STORE_PATH.key()));
123-
}
124-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD.key())) {
125-
truststorePassword =
126-
Optional.of(
127-
pluginConfig.getString(
128-
EzsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD.key()));
129-
}
130-
}
131-
boolean tlsVerifyHostnames = EzsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.defaultValue();
132-
if (pluginConfig.hasPath(EzsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.key())) {
133-
tlsVerifyHostnames =
134-
pluginConfig.getBoolean(EzsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.key());
135-
}
94+
pluginConfig.get(EasysearchSinkCommonOptions.TLS_VERIFY_CERTIFICATE);
95+
96+
boolean tlsVerifyHostnames =
97+
pluginConfig.get(EasysearchSinkCommonOptions.TLS_VERIFY_HOSTNAME);
13698
return createInstance(
13799
hosts,
138100
username,
Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import java.util.List;
2424

25-
public class EzsClusterConnectionConfig {
25+
public class EasysearchSinkCommonOptions {
2626

2727
public static final Option<List<String>> HOSTS =
2828
Options.key("hosts")
@@ -31,6 +31,12 @@ public class EzsClusterConnectionConfig {
3131
.withDescription(
3232
"Easysearch cluster http address, the format is host:port, allowing multiple hosts to be specified. Such as [\"host1:9200\", \"host2:9200\"]");
3333

34+
public static final Option<String> INDEX =
35+
Options.key("index")
36+
.stringType()
37+
.noDefaultValue()
38+
.withDescription("Easysearch index name, support * fuzzy matching");
39+
3440
public static final Option<String> USERNAME =
3541
Options.key("username")
3642
.stringType()
Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,8 @@
2222

2323
import java.util.List;
2424

25-
public class SinkConfig {
25+
public class EasysearchSinkOptions extends EasysearchSinkCommonOptions {
2626

27-
public static final Option<String> INDEX =
28-
Options.key("index")
29-
.stringType()
30-
.noDefaultValue()
31-
.withDescription(
32-
"Easysearch index name.Index support contains variables of field name,such as seatunnel_${age},and the field must appear at seatunnel row. If not, we will treat it as a normal index");
3327
public static final Option<List<String>> PRIMARY_KEYS =
3428
Options.key("primary_keys")
3529
.listType(String.class)
Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,7 @@
2525
import java.util.List;
2626
import java.util.Map;
2727

28-
public class SourceConfig {
29-
30-
public static final Option<String> INDEX =
31-
Options.key("index")
32-
.stringType()
33-
.noDefaultValue()
34-
.withDescription("Easysearch index name, support * fuzzy matching");
35-
36-
public static final Option<List<String>> SOURCE =
37-
Options.key("source")
38-
.listType()
39-
.noDefaultValue()
40-
.withDescription(
41-
"The fields of index. You can get the document id by specifying the field _id.If sink _id to other index,you need specify an alias for _id due to the Easysearch limit");
28+
public class EasysearchSourceOptions extends EasysearchSinkCommonOptions {
4229

4330
public static final Option<String> SCROLL_TIME =
4431
Options.key("scroll_time")
@@ -61,4 +48,11 @@ public class SourceConfig {
6148
Collections.singletonMap("match_all", new HashMap<String, String>()))
6249
.withDescription(
6350
"Easysearch query language. You can control the range of data read");
51+
52+
public static final Option<List<String>> SOURCE =
53+
Options.key("source")
54+
.listType()
55+
.noDefaultValue()
56+
.withDescription(
57+
"The fields of index. You can get the document id by specifying the field _id.If sink _id to other index,you need specify an alias for _id due to the Easysearch limit");
6458
}

seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/dto/IndexInfo.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.dto;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig;
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSinkCommonOptions;
22+
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSinkOptions;
2323

2424
import lombok.Data;
2525

@@ -31,17 +31,12 @@ public class IndexInfo {
3131
private String[] primaryKeys;
3232
private String keyDelimiter;
3333

34-
public IndexInfo(Config pluginConfig) {
35-
index = pluginConfig.getString(SinkConfig.INDEX.key());
36-
if (pluginConfig.hasPath(SinkConfig.PRIMARY_KEYS.key())) {
34+
public IndexInfo(ReadonlyConfig pluginConfig) {
35+
index = pluginConfig.get(EasysearchSinkCommonOptions.INDEX);
36+
if (pluginConfig.getOptional(EasysearchSinkOptions.PRIMARY_KEYS).isPresent()) {
3737
primaryKeys =
38-
pluginConfig
39-
.getStringList(SinkConfig.PRIMARY_KEYS.key())
40-
.toArray(new String[0]);
41-
}
42-
keyDelimiter = SinkConfig.KEY_DELIMITER.defaultValue();
43-
if (pluginConfig.hasPath(SinkConfig.KEY_DELIMITER.key())) {
44-
keyDelimiter = pluginConfig.getString(SinkConfig.KEY_DELIMITER.key());
38+
pluginConfig.get(EasysearchSinkOptions.PRIMARY_KEYS).toArray(new String[0]);
4539
}
40+
keyDelimiter = pluginConfig.get(EasysearchSinkOptions.KEY_DELIMITER);
4641
}
4742
}

seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java

Lines changed: 8 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,66 +17,41 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.api.common.PrepareFailException;
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2321
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2422
import org.apache.seatunnel.api.sink.SinkWriter;
2523
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2624
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
27-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2825
import org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchAggregatedCommitInfo;
2926
import org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchCommitInfo;
3027
import org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchSinkState;
3128

32-
import com.google.auto.service.AutoService;
33-
3429
import java.util.Optional;
3530

36-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_BATCH_SIZE;
37-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_RETRY_COUNT;
38-
39-
@AutoService(SeaTunnelSink.class)
4031
public class EasysearchSink
4132
implements SeaTunnelSink<
4233
SeaTunnelRow,
4334
EasysearchSinkState,
4435
EasysearchCommitInfo,
4536
EasysearchAggregatedCommitInfo> {
4637

47-
private Config pluginConfig;
48-
private SeaTunnelRowType seaTunnelRowType;
49-
50-
private int maxBatchSize = MAX_BATCH_SIZE.defaultValue();
38+
private final ReadonlyConfig pluginConfig;
39+
private final CatalogTable catalogTable;
5140

52-
private int maxRetryCount = MAX_RETRY_COUNT.defaultValue();
53-
54-
@Override
55-
public String getPluginName() {
56-
return "Easysearch";
57-
}
58-
59-
@Override
60-
public void prepare(Config pluginConfig) throws PrepareFailException {
41+
public EasysearchSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
42+
this.catalogTable = catalogTable;
6143
this.pluginConfig = pluginConfig;
62-
if (pluginConfig.hasPath(MAX_BATCH_SIZE.key())) {
63-
maxBatchSize = pluginConfig.getInt(MAX_BATCH_SIZE.key());
64-
}
65-
if (pluginConfig.hasPath(MAX_RETRY_COUNT.key())) {
66-
maxRetryCount = pluginConfig.getInt(MAX_RETRY_COUNT.key());
67-
}
6844
}
6945

7046
@Override
71-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
72-
this.seaTunnelRowType = seaTunnelRowType;
47+
public String getPluginName() {
48+
return "Easysearch";
7349
}
7450

7551
@Override
7652
public SinkWriter<SeaTunnelRow, EasysearchCommitInfo, EasysearchSinkState> createWriter(
7753
SinkWriter.Context context) {
78-
return new EasysearchSinkWriter(
79-
context, seaTunnelRowType, pluginConfig, maxBatchSize, maxRetryCount);
54+
return new EasysearchSinkWriter(context, catalogTable.getSeaTunnelRowType(), pluginConfig);
8055
}
8156

8257
@Override

seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSinkFactory.java

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,14 @@
1818
package org.apache.seatunnel.connectors.seatunnel.easysearch.sink;
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.table.connector.TableSink;
2122
import org.apache.seatunnel.api.table.factory.Factory;
2223
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
24+
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
25+
import org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSinkOptions;
2326

2427
import com.google.auto.service.AutoService;
2528

26-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.HOSTS;
27-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.PASSWORD;
28-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD;
29-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_KEY_STORE_PATH;
30-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD;
31-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_TRUST_STORE_PATH;
32-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE;
33-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.TLS_VERIFY_HOSTNAME;
34-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.EzsClusterConnectionConfig.USERNAME;
35-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.KEY_DELIMITER;
36-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_BATCH_SIZE;
37-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_RETRY_COUNT;
38-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.PRIMARY_KEYS;
39-
import static org.apache.seatunnel.connectors.seatunnel.easysearch.config.SourceConfig.INDEX;
40-
4129
@AutoService(Factory.class)
4230
public class EasysearchSinkFactory implements TableSinkFactory {
4331
@Override
@@ -48,20 +36,25 @@ public String factoryIdentifier() {
4836
@Override
4937
public OptionRule optionRule() {
5038
return OptionRule.builder()
51-
.required(HOSTS, INDEX)
39+
.required(EasysearchSinkOptions.HOSTS, EasysearchSinkOptions.INDEX)
5240
.optional(
53-
PRIMARY_KEYS,
54-
KEY_DELIMITER,
55-
USERNAME,
56-
PASSWORD,
57-
MAX_RETRY_COUNT,
58-
MAX_BATCH_SIZE,
59-
TLS_VERIFY_CERTIFICATE,
60-
TLS_VERIFY_HOSTNAME,
61-
TLS_KEY_STORE_PATH,
62-
TLS_KEY_STORE_PASSWORD,
63-
TLS_TRUST_STORE_PATH,
64-
TLS_TRUST_STORE_PASSWORD)
41+
EasysearchSinkOptions.USERNAME,
42+
EasysearchSinkOptions.PASSWORD,
43+
EasysearchSinkOptions.PRIMARY_KEYS,
44+
EasysearchSinkOptions.KEY_DELIMITER,
45+
EasysearchSinkOptions.MAX_RETRY_COUNT,
46+
EasysearchSinkOptions.MAX_BATCH_SIZE,
47+
EasysearchSinkOptions.TLS_VERIFY_CERTIFICATE,
48+
EasysearchSinkOptions.TLS_VERIFY_HOSTNAME,
49+
EasysearchSinkOptions.TLS_KEY_STORE_PATH,
50+
EasysearchSinkOptions.TLS_KEY_STORE_PASSWORD,
51+
EasysearchSinkOptions.TLS_TRUST_STORE_PATH,
52+
EasysearchSinkOptions.TLS_TRUST_STORE_PASSWORD)
6553
.build();
6654
}
55+
56+
@Override
57+
public TableSink createSink(TableSinkFactoryContext context) {
58+
return () -> new EasysearchSink(context.getOptions(), context.getCatalogTable());
59+
}
6760
}

0 commit comments

Comments
 (0)