Skip to content

Commit f459f50

Browse files
liugddxgdliu3
andauthored
[Improve] [Connector-V2] Remove scheduler in InfluxDB sink (#5271)
--------- Co-authored-by: gdliu3 <gdliu3@iflytek.com>
1 parent 8bbda25 commit f459f50

File tree

4 files changed

+1
-52
lines changed

4 files changed

+1
-52
lines changed

docs/en/connector-v2/sink/InfluxDB.md

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ Write data to InfluxDB.
2222
| key_time | string | no | processing time |
2323
| key_tags | array | no | exclude `field` & `key_time` |
2424
| batch_size | int | no | 1024 |
25-
| batch_interval_ms | int | no | - |
2625
| max_retries | int | no | - |
2726
| retry_backoff_multiplier_ms | int | no | - |
2827
| connect_timeout_ms | long | no | 15000 |
@@ -63,11 +62,7 @@ If not specified, include all fields with `influxDB` measurement field
6362

6463
### batch_size [int]
6564

66-
For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB
67-
68-
### batch_interval_ms [int]
69-
70-
For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB
65+
For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `checkpoint.interval`, the data will be flushed into the influxDB
7166

7267
### max_retries [int]
7368

seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,6 @@ public SinkConfig(Config config) {
6060
.defaultValue(1024)
6161
.withDescription("batch size of the influxdb client");
6262

63-
public static final Option<Integer> BATCH_INTERVAL_MS =
64-
Options.key("batch_interval_ms")
65-
.intType()
66-
.noDefaultValue()
67-
.withDescription("batch interval ms of the influxdb client");
68-
6963
public static final Option<Integer> MAX_RETRIES =
7064
Options.key("max_retries")
7165
.intType()
@@ -104,7 +98,6 @@ public SinkConfig(Config config) {
10498
private String keyTime;
10599
private List<String> keyTags;
106100
private int batchSize = BATCH_SIZE.defaultValue();
107-
private Integer batchIntervalMs;
108101
private int maxRetries;
109102
private int retryBackoffMultiplierMs;
110103
private int maxRetryBackoffMs;
@@ -119,9 +112,6 @@ public static SinkConfig loadConfig(Config config) {
119112
if (config.hasPath(KEY_TAGS.key())) {
120113
sinkConfig.setKeyTags(config.getStringList(KEY_TAGS.key()));
121114
}
122-
if (config.hasPath(BATCH_INTERVAL_MS.key())) {
123-
sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS.key()));
124-
}
125115
if (config.hasPath(MAX_RETRIES.key())) {
126116
sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES.key()));
127117
}

seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.PASSWORD;
2929
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
3030
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.USERNAME;
31-
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.BATCH_INTERVAL_MS;
3231
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.BATCH_SIZE;
3332
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
3433
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_TAGS;
@@ -54,7 +53,6 @@ public OptionRule optionRule() {
5453
KEY_TAGS,
5554
KEY_TIME,
5655
BATCH_SIZE,
57-
BATCH_INTERVAL_MS,
5856
MAX_RETRIES,
5957
RETRY_BACKOFF_MULTIPLIER_MS)
6058
.build();

seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.influxdb.dto.BatchPoints;
3535
import org.influxdb.dto.Point;
3636

37-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
3837
import lombok.SneakyThrows;
3938
import lombok.extern.slf4j.Slf4j;
4039

@@ -43,10 +42,6 @@
4342
import java.util.ArrayList;
4443
import java.util.List;
4544
import java.util.Optional;
46-
import java.util.concurrent.Executors;
47-
import java.util.concurrent.ScheduledExecutorService;
48-
import java.util.concurrent.ScheduledFuture;
49-
import java.util.concurrent.TimeUnit;
5045

5146
@Slf4j
5247
public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
@@ -55,15 +50,11 @@ public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
5550
private InfluxDB influxdb;
5651
private final SinkConfig sinkConfig;
5752
private final List<Point> batchList;
58-
private ScheduledExecutorService scheduler;
59-
private ScheduledFuture<?> scheduledFuture;
6053
private volatile Exception flushException;
61-
private final Integer batchIntervalMs;
6254

6355
public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType)
6456
throws ConnectException {
6557
this.sinkConfig = SinkConfig.loadConfig(pluginConfig);
66-
this.batchIntervalMs = sinkConfig.getBatchIntervalMs();
6758
this.serializer =
6859
new DefaultSerializer(
6960
seaTunnelRowType,
@@ -73,26 +64,6 @@ public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType
7364
sinkConfig.getMeasurement());
7465
this.batchList = new ArrayList<>();
7566

76-
if (batchIntervalMs != null) {
77-
scheduler =
78-
Executors.newSingleThreadScheduledExecutor(
79-
new ThreadFactoryBuilder()
80-
.setNameFormat("influxDB-sink-output-%s")
81-
.build());
82-
scheduledFuture =
83-
scheduler.scheduleAtFixedRate(
84-
() -> {
85-
try {
86-
flush();
87-
} catch (IOException e) {
88-
flushException = e;
89-
}
90-
},
91-
batchIntervalMs,
92-
batchIntervalMs,
93-
TimeUnit.MILLISECONDS);
94-
}
95-
9667
connect();
9768
}
9869

@@ -112,11 +83,6 @@ public Optional<Void> prepareCommit() {
11283

11384
@Override
11485
public void close() throws IOException {
115-
if (scheduledFuture != null) {
116-
scheduledFuture.cancel(false);
117-
scheduler.shutdown();
118-
}
119-
12086
flush();
12187

12288
if (influxdb != null) {

0 commit comments

Comments
 (0)