Skip to content

Commit 6e07393

Browse files
authored
[improve] iotdb options (#8965)
1 parent 7821e82 commit 6e07393

File tree

13 files changed

+276
-322
lines changed

13 files changed

+276
-322
lines changed

seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,7 @@ private Set<String> buildWhiteList() {
181181
whiteList.add("TDengineSourceOptions");
182182
whiteList.add("PulsarSourceOptions");
183183
whiteList.add("MongodbSinkOptions");
184-
whiteList.add("IoTDBSinkOptions");
185184
whiteList.add("PaimonSourceOptions");
186-
whiteList.add("IoTDBSourceOptions");
187185
whiteList.add("SlsSourceOptions");
188186
whiteList.add("SentrySinkOptions");
189187
whiteList.add("QdrantSinkOptions");

seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/CommonConfig.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
1919

20-
import org.apache.seatunnel.api.configuration.Option;
21-
import org.apache.seatunnel.api.configuration.Options;
22-
2320
import lombok.AllArgsConstructor;
2421
import lombok.Getter;
2522
import lombok.ToString;
@@ -31,13 +28,6 @@
3128
@AllArgsConstructor
3229
public class CommonConfig {
3330

34-
public static final Option<String> NODE_URLS =
35-
Options.key("node_urls").stringType().noDefaultValue().withDescription("node urls");
36-
public static final Option<String> USERNAME =
37-
Options.key("username").stringType().noDefaultValue().withDescription("username");
38-
public static final Option<String> PASSWORD =
39-
Options.key("password").stringType().noDefaultValue().withDescription("password");
40-
4131
private final List<String> nodeUrls;
4232
private final String username;
4333
private final String password;
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
23+
public class IoTDBCommonOptions {
24+
25+
public static final Option<String> NODE_URLS =
26+
Options.key("node_urls").stringType().noDefaultValue().withDescription("node urls");
27+
public static final Option<String> USERNAME =
28+
Options.key("username").stringType().noDefaultValue().withDescription("username");
29+
public static final Option<String> PASSWORD =
30+
Options.key("password").stringType().noDefaultValue().withDescription("password");
31+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
23+
import java.util.List;
24+
25+
public class IoTDBSinkOptions extends IoTDBCommonOptions {
26+
27+
private static final int DEFAULT_BATCH_SIZE = 1024;
28+
29+
public static final Option<String> KEY_TIMESTAMP =
30+
Options.key("key_timestamp")
31+
.stringType()
32+
.noDefaultValue()
33+
.withDescription("key timestamp");
34+
public static final Option<String> KEY_DEVICE =
35+
Options.key("key_device").stringType().noDefaultValue().withDescription("key device");
36+
public static final Option<List<String>> KEY_MEASUREMENT_FIELDS =
37+
Options.key("key_measurement_fields")
38+
.listType()
39+
.noDefaultValue()
40+
.withDescription("key measurement fields");
41+
public static final Option<String> STORAGE_GROUP =
42+
Options.key("storage_group")
43+
.stringType()
44+
.noDefaultValue()
45+
.withDescription("store group");
46+
public static final Option<Integer> BATCH_SIZE =
47+
Options.key("batch_size")
48+
.intType()
49+
.defaultValue(DEFAULT_BATCH_SIZE)
50+
.withDescription("batch size");
51+
public static final Option<Integer> MAX_RETRIES =
52+
Options.key("max_retries").intType().noDefaultValue().withDescription("max retries");
53+
public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
54+
Options.key("retry_backoff_multiplier_ms")
55+
.intType()
56+
.noDefaultValue()
57+
.withDescription("retry backoff multiplier ms ");
58+
public static final Option<Integer> MAX_RETRY_BACKOFF_MS =
59+
Options.key("max_retry_backoff_ms")
60+
.intType()
61+
.noDefaultValue()
62+
.withDescription("max retry backoff ms ");
63+
public static final Option<Integer> DEFAULT_THRIFT_BUFFER_SIZE =
64+
Options.key("default_thrift_buffer_size")
65+
.intType()
66+
.noDefaultValue()
67+
.withDescription("default thrift buffer size");
68+
public static final Option<Integer> MAX_THRIFT_FRAME_SIZE =
69+
Options.key("max_thrift_frame_size")
70+
.intType()
71+
.noDefaultValue()
72+
.withDescription("max thrift frame size");
73+
public static final Option<String> ZONE_ID =
74+
Options.key("zone_id").stringType().noDefaultValue().withDescription("zone id");
75+
public static final Option<Boolean> ENABLE_RPC_COMPRESSION =
76+
Options.key("enable_rpc_compression")
77+
.booleanType()
78+
.noDefaultValue()
79+
.withDescription("enable rpc comm");
80+
public static final Option<Integer> CONNECTION_TIMEOUT_IN_MS =
81+
Options.key("connection_timeout_in_ms")
82+
.intType()
83+
.noDefaultValue()
84+
.withDescription("connection timeout ms");
85+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* <p>please see the following link for more details:
2727
* https://iotdb.apache.org/UserGuide/Master/API/Programming-Java-Native-API.html
2828
*/
29-
public class SourceConfig {
29+
public class IoTDBSourceOptions extends IoTDBCommonOptions {
3030

3131
public static final Option<String> SQL =
3232
Options.key("sql").stringType().noDefaultValue().withDescription("sql");

seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java

Lines changed: 34 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.api.configuration.Option;
23-
import org.apache.seatunnel.api.configuration.Options;
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2421

2522
import lombok.Getter;
2623
import lombok.NonNull;
@@ -30,78 +27,18 @@
3027
import java.time.ZoneId;
3128
import java.util.List;
3229

33-
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
3430
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
3531

3632
@Setter
3733
@Getter
3834
@ToString
3935
public class SinkConfig extends CommonConfig {
4036

41-
private static final int DEFAULT_BATCH_SIZE = 1024;
42-
43-
public static final Option<String> KEY_TIMESTAMP =
44-
Options.key("key_timestamp")
45-
.stringType()
46-
.noDefaultValue()
47-
.withDescription("key timestamp");
48-
public static final Option<String> KEY_DEVICE =
49-
Options.key("key_device").stringType().noDefaultValue().withDescription("key device");
50-
public static final Option<List<String>> KEY_MEASUREMENT_FIELDS =
51-
Options.key("key_measurement_fields")
52-
.listType()
53-
.noDefaultValue()
54-
.withDescription("key measurement fields");
55-
public static final Option<String> STORAGE_GROUP =
56-
Options.key("storage_group")
57-
.stringType()
58-
.noDefaultValue()
59-
.withDescription("store group");
60-
public static final Option<Integer> BATCH_SIZE =
61-
Options.key("batch_size")
62-
.intType()
63-
.defaultValue(DEFAULT_BATCH_SIZE)
64-
.withDescription("batch size");
65-
public static final Option<Integer> MAX_RETRIES =
66-
Options.key("max_retries").intType().noDefaultValue().withDescription("max retries");
67-
public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
68-
Options.key("retry_backoff_multiplier_ms")
69-
.intType()
70-
.noDefaultValue()
71-
.withDescription("retry backoff multiplier ms ");
72-
public static final Option<Integer> MAX_RETRY_BACKOFF_MS =
73-
Options.key("max_retry_backoff_ms")
74-
.intType()
75-
.noDefaultValue()
76-
.withDescription("max retry backoff ms ");
77-
public static final Option<Integer> DEFAULT_THRIFT_BUFFER_SIZE =
78-
Options.key("default_thrift_buffer_size")
79-
.intType()
80-
.noDefaultValue()
81-
.withDescription("default thrift buffer size");
82-
public static final Option<Integer> MAX_THRIFT_FRAME_SIZE =
83-
Options.key("max_thrift_frame_size")
84-
.intType()
85-
.noDefaultValue()
86-
.withDescription("max thrift frame size");
87-
public static final Option<String> ZONE_ID =
88-
Options.key("zone_id").stringType().noDefaultValue().withDescription("zone id");
89-
public static final Option<Boolean> ENABLE_RPC_COMPRESSION =
90-
Options.key("enable_rpc_compression")
91-
.booleanType()
92-
.noDefaultValue()
93-
.withDescription("enable rpc comm");
94-
public static final Option<Integer> CONNECTION_TIMEOUT_IN_MS =
95-
Options.key("connection_timeout_in_ms")
96-
.intType()
97-
.noDefaultValue()
98-
.withDescription("connection timeout ms");
99-
10037
private String keyTimestamp;
10138
private String keyDevice;
10239
private List<String> keyMeasurementFields;
10340
private String storageGroup;
104-
private int batchSize = BATCH_SIZE.defaultValue();
41+
private int batchSize;
10542
private int maxRetries;
10643
private int retryBackoffMultiplierMs;
10744
private int maxRetryBackoffMs;
@@ -116,70 +53,50 @@ public SinkConfig(
11653
super(nodeUrls, username, password);
11754
}
11855

119-
public static SinkConfig loadConfig(Config pluginConfig) {
56+
public static SinkConfig loadConfig(ReadonlyConfig pluginConfig) {
12057
SinkConfig sinkConfig =
12158
new SinkConfig(
122-
pluginConfig.getStringList(NODE_URLS.key()),
123-
pluginConfig.getString(USERNAME.key()),
124-
pluginConfig.getString(PASSWORD.key()));
59+
pluginConfig.toConfig().getStringList(IoTDBSinkOptions.NODE_URLS.key()),
60+
pluginConfig.get(IoTDBSinkOptions.USERNAME),
61+
pluginConfig.get(IoTDBSinkOptions.PASSWORD));
12562

126-
sinkConfig.setKeyDevice(pluginConfig.getString(KEY_DEVICE.key()));
127-
if (pluginConfig.hasPath(KEY_TIMESTAMP.key())) {
128-
sinkConfig.setKeyTimestamp(pluginConfig.getString(KEY_TIMESTAMP.key()));
129-
}
130-
if (pluginConfig.hasPath(KEY_MEASUREMENT_FIELDS.key())) {
131-
sinkConfig.setKeyMeasurementFields(
132-
pluginConfig.getStringList(KEY_MEASUREMENT_FIELDS.key()));
133-
}
134-
if (pluginConfig.hasPath(STORAGE_GROUP.key())) {
135-
sinkConfig.setStorageGroup(pluginConfig.getString(STORAGE_GROUP.key()));
63+
sinkConfig.setKeyDevice(pluginConfig.get(IoTDBSinkOptions.KEY_DEVICE));
64+
sinkConfig.setKeyTimestamp(pluginConfig.get(IoTDBSinkOptions.KEY_TIMESTAMP));
65+
sinkConfig.setKeyMeasurementFields(
66+
pluginConfig.get(IoTDBSinkOptions.KEY_MEASUREMENT_FIELDS));
67+
sinkConfig.setStorageGroup(pluginConfig.get(IoTDBSinkOptions.STORAGE_GROUP));
68+
if (pluginConfig.getOptional(IoTDBSinkOptions.BATCH_SIZE).isPresent()) {
69+
sinkConfig.setBatchSize(pluginConfig.get(IoTDBSinkOptions.BATCH_SIZE));
13670
}
137-
if (pluginConfig.hasPath(BATCH_SIZE.key())) {
138-
int batchSize = checkIntArgument(pluginConfig.getInt(BATCH_SIZE.key()));
139-
sinkConfig.setBatchSize(batchSize);
71+
if (pluginConfig.getOptional(IoTDBSinkOptions.MAX_RETRIES).isPresent()) {
72+
sinkConfig.setMaxRetries(pluginConfig.get(IoTDBSinkOptions.MAX_RETRIES));
14073
}
141-
if (pluginConfig.hasPath(MAX_RETRIES.key())) {
142-
int maxRetries = checkIntArgument(pluginConfig.getInt(MAX_RETRIES.key()));
143-
sinkConfig.setMaxRetries(maxRetries);
74+
if (pluginConfig.getOptional(IoTDBSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS).isPresent()) {
75+
sinkConfig.setRetryBackoffMultiplierMs(
76+
pluginConfig.get(IoTDBSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS));
14477
}
145-
if (pluginConfig.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
146-
int retryBackoffMultiplierMs =
147-
checkIntArgument(pluginConfig.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
148-
sinkConfig.setRetryBackoffMultiplierMs(retryBackoffMultiplierMs);
78+
if (pluginConfig.getOptional(IoTDBSinkOptions.MAX_RETRY_BACKOFF_MS).isPresent()) {
79+
sinkConfig.setMaxRetryBackoffMs(
80+
pluginConfig.get(IoTDBSinkOptions.MAX_RETRY_BACKOFF_MS));
14981
}
150-
if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
151-
int maxRetryBackoffMs =
152-
checkIntArgument(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS.key()));
153-
sinkConfig.setMaxRetryBackoffMs(maxRetryBackoffMs);
82+
if (pluginConfig.getOptional(IoTDBSinkOptions.DEFAULT_THRIFT_BUFFER_SIZE).isPresent()) {
83+
sinkConfig.setThriftDefaultBufferSize(
84+
pluginConfig.get(IoTDBSinkOptions.DEFAULT_THRIFT_BUFFER_SIZE));
15485
}
155-
if (pluginConfig.hasPath(DEFAULT_THRIFT_BUFFER_SIZE.key())) {
156-
int thriftDefaultBufferSize =
157-
checkIntArgument(pluginConfig.getInt(DEFAULT_THRIFT_BUFFER_SIZE.key()));
158-
sinkConfig.setThriftDefaultBufferSize(thriftDefaultBufferSize);
86+
if (pluginConfig.getOptional(IoTDBSinkOptions.MAX_THRIFT_FRAME_SIZE).isPresent()) {
87+
sinkConfig.setThriftMaxFrameSize(
88+
pluginConfig.get(IoTDBSinkOptions.MAX_THRIFT_FRAME_SIZE));
15989
}
160-
if (pluginConfig.hasPath(MAX_THRIFT_FRAME_SIZE.key())) {
161-
int thriftMaxFrameSize =
162-
checkIntArgument(pluginConfig.getInt(MAX_THRIFT_FRAME_SIZE.key()));
163-
sinkConfig.setThriftMaxFrameSize(thriftMaxFrameSize);
90+
if (pluginConfig.getOptional(IoTDBSinkOptions.ZONE_ID).isPresent()) {
91+
sinkConfig.setZoneId(ZoneId.of(pluginConfig.get(IoTDBSinkOptions.ZONE_ID)));
16492
}
165-
if (pluginConfig.hasPath(ZONE_ID.key())) {
166-
sinkConfig.setZoneId(ZoneId.of(pluginConfig.getString(ZONE_ID.key())));
167-
}
168-
if (pluginConfig.hasPath(ENABLE_RPC_COMPRESSION.key())) {
169-
sinkConfig.setEnableRPCCompression(
170-
pluginConfig.getBoolean(ENABLE_RPC_COMPRESSION.key()));
171-
}
172-
if (pluginConfig.hasPath(CONNECTION_TIMEOUT_IN_MS.key())) {
173-
int connectionTimeoutInMs =
174-
checkIntArgument(pluginConfig.getInt(CONNECTION_TIMEOUT_IN_MS.key()));
93+
sinkConfig.setEnableRPCCompression(
94+
pluginConfig.get(IoTDBSinkOptions.ENABLE_RPC_COMPRESSION));
95+
if (pluginConfig.getOptional(IoTDBSinkOptions.CONNECTION_TIMEOUT_IN_MS).isPresent()) {
17596
checkNotNull(sinkConfig.getEnableRPCCompression());
176-
sinkConfig.setConnectionTimeoutInMs(connectionTimeoutInMs);
97+
sinkConfig.setConnectionTimeoutInMs(
98+
pluginConfig.get(IoTDBSinkOptions.CONNECTION_TIMEOUT_IN_MS));
17799
}
178100
return sinkConfig;
179101
}
180-
181-
private static int checkIntArgument(int args) {
182-
checkArgument(args > 0);
183-
return args;
184-
}
185102
}

0 commit comments

Comments
 (0)