Skip to content

Commit 2996378

Browse files
liugddxgdliu3
andauthored
[Improve] [Connector-V2] Remove scheduler in IoTDB sink (#5270)
--------- Co-authored-by: gdliu3 <gdliu3@iflytek.com>
1 parent 794207c commit 2996378

File tree

4 files changed

+1
-51
lines changed

4 files changed

+1
-51
lines changed

docs/en/connector-v2/sink/IoTDB.md

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ the same `key` and `timestamp`, the new data will overwrite the old one.
3131
| key_measurement_fields | array | no | exclude `device` & `timestamp` |
3232
| storage_group | string | no | - |
3333
| batch_size | int | no | 1024 |
34-
| batch_interval_ms | int | no | - |
3534
| max_retries | int | no | - |
3635
| retry_backoff_multiplier_ms | int | no | - |
3736
| max_retry_backoff_ms | int | no | - |
@@ -74,11 +73,7 @@ example: deviceId = ${storage_group} + "." + ${key_device}
7473

7574
### batch_size [int]
7675

77-
For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the IoTDB
78-
79-
### batch_interval_ms [int]
80-
81-
For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the IoTDB
76+
For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `checkpoint.interval`, the data will be flushed into the IoTDB
8277

8378
### max_retries [int]
8479

@@ -129,7 +124,6 @@ sink {
129124
username = "root"
130125
password = "root"
131126
batch_size = 1024
132-
batch_interval_ms = 1000
133127
}
134128
}
135129
```

seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,6 @@ public class SinkConfig extends CommonConfig {
6262
.intType()
6363
.defaultValue(DEFAULT_BATCH_SIZE)
6464
.withDescription("batch size");
65-
public static final Option<String> BATCH_INTERVAL_MS =
66-
Options.key("batch_interval_ms")
67-
.stringType()
68-
.noDefaultValue()
69-
.withDescription("batch interval ms");
7065
public static final Option<Integer> MAX_RETRIES =
7166
Options.key("max_retries").intType().noDefaultValue().withDescription("max retries");
7267
public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
@@ -107,7 +102,6 @@ public class SinkConfig extends CommonConfig {
107102
private List<String> keyMeasurementFields;
108103
private String storageGroup;
109104
private int batchSize = BATCH_SIZE.defaultValue();
110-
private Integer batchIntervalMs;
111105
private int maxRetries;
112106
private int retryBackoffMultiplierMs;
113107
private int maxRetryBackoffMs;
@@ -144,10 +138,6 @@ public static SinkConfig loadConfig(Config pluginConfig) {
144138
int batchSize = checkIntArgument(pluginConfig.getInt(BATCH_SIZE.key()));
145139
sinkConfig.setBatchSize(batchSize);
146140
}
147-
if (pluginConfig.hasPath(BATCH_INTERVAL_MS.key())) {
148-
int batchIntervalMs = checkIntArgument(pluginConfig.getInt(BATCH_INTERVAL_MS.key()));
149-
sinkConfig.setBatchIntervalMs(batchIntervalMs);
150-
}
151141
if (pluginConfig.hasPath(MAX_RETRIES.key())) {
152142
int maxRetries = checkIntArgument(pluginConfig.getInt(MAX_RETRIES.key()));
153143
sinkConfig.setMaxRetries(maxRetries);

seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkClient.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,12 @@
2828
import org.apache.iotdb.session.Session;
2929
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
3030

31-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
3231
import lombok.Getter;
3332
import lombok.extern.slf4j.Slf4j;
3433

3534
import java.io.IOException;
3635
import java.util.ArrayList;
3736
import java.util.List;
38-
import java.util.concurrent.Executors;
39-
import java.util.concurrent.ScheduledExecutorService;
40-
import java.util.concurrent.ScheduledFuture;
41-
import java.util.concurrent.TimeUnit;
4237

4338
@Slf4j
4439
public class IoTDBSinkClient {
@@ -47,8 +42,6 @@ public class IoTDBSinkClient {
4742
private final List<IoTDBRecord> batchList;
4843

4944
private Session session;
50-
private ScheduledExecutorService scheduler;
51-
private ScheduledFuture<?> scheduledFuture;
5245
private volatile boolean initialize;
5346
private volatile Exception flushException;
5447

@@ -95,26 +88,6 @@ private void tryInit() throws IOException {
9588
"Initialize IoTDB client failed.",
9689
e);
9790
}
98-
99-
if (sinkConfig.getBatchIntervalMs() != null) {
100-
scheduler =
101-
Executors.newSingleThreadScheduledExecutor(
102-
new ThreadFactoryBuilder()
103-
.setNameFormat("IoTDB-sink-output-%s")
104-
.build());
105-
scheduledFuture =
106-
scheduler.scheduleAtFixedRate(
107-
() -> {
108-
try {
109-
flush();
110-
} catch (IOException e) {
111-
flushException = e;
112-
}
113-
},
114-
sinkConfig.getBatchIntervalMs(),
115-
sinkConfig.getBatchIntervalMs(),
116-
TimeUnit.MILLISECONDS);
117-
}
11891
initialize = true;
11992
}
12093

@@ -129,11 +102,6 @@ public synchronized void write(IoTDBRecord record) throws IOException {
129102
}
130103

131104
public synchronized void close() throws IOException {
132-
if (scheduledFuture != null) {
133-
scheduledFuture.cancel(false);
134-
scheduler.shutdown();
135-
}
136-
137105
flush();
138106

139107
try {

seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.NODE_URLS;
2727
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.PASSWORD;
2828
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.USERNAME;
29-
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.BATCH_INTERVAL_MS;
3029
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.BATCH_SIZE;
3130
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.CONNECTION_TIMEOUT_IN_MS;
3231
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.DEFAULT_THRIFT_BUFFER_SIZE;
@@ -57,7 +56,6 @@ public OptionRule optionRule() {
5756
KEY_MEASUREMENT_FIELDS,
5857
STORAGE_GROUP,
5958
BATCH_SIZE,
60-
BATCH_INTERVAL_MS,
6159
MAX_RETRIES,
6260
RETRY_BACKOFF_MULTIPLIER_MS,
6361
MAX_RETRY_BACKOFF_MS,

0 commit comments

Comments
 (0)