Skip to content

Commit 56f13e9

Browse files
authored
Support multi-table sink feature for influxdb (#6278)
1 parent 9c3c2f1 commit 56f13e9

File tree

6 files changed

+200
-54
lines changed

6 files changed

+200
-54
lines changed

seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
public class SinkConfig extends InfluxDBConfig {
3535
public SinkConfig(Config config) {
3636
super(config);
37+
loadConfig(config);
3738
}
3839

3940
public static final Option<String> KEY_TIME =
@@ -103,35 +104,32 @@ public SinkConfig(Config config) {
103104
private int maxRetryBackoffMs;
104105
private TimePrecision precision = DEFAULT_TIME_PRECISION;
105106

106-
public static SinkConfig loadConfig(Config config) {
107-
SinkConfig sinkConfig = new SinkConfig(config);
107+
public void loadConfig(Config config) {
108108

109109
if (config.hasPath(KEY_TIME.key())) {
110-
sinkConfig.setKeyTime(config.getString(KEY_TIME.key()));
110+
setKeyTime(config.getString(KEY_TIME.key()));
111111
}
112112
if (config.hasPath(KEY_TAGS.key())) {
113-
sinkConfig.setKeyTags(config.getStringList(KEY_TAGS.key()));
113+
setKeyTags(config.getStringList(KEY_TAGS.key()));
114114
}
115115
if (config.hasPath(MAX_RETRIES.key())) {
116-
sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES.key()));
116+
setMaxRetries(config.getInt(MAX_RETRIES.key()));
117117
}
118118
if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
119-
sinkConfig.setRetryBackoffMultiplierMs(
120-
config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
119+
setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
121120
}
122121
if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
123-
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
122+
setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
124123
}
125124
if (config.hasPath(WRITE_TIMEOUT.key())) {
126-
sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
125+
setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
127126
}
128127
if (config.hasPath(RETENTION_POLICY.key())) {
129-
sinkConfig.setRp(config.getString(RETENTION_POLICY.key()));
128+
setRp(config.getString(RETENTION_POLICY.key()));
130129
}
131130
if (config.hasPath(EPOCH.key())) {
132-
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
131+
setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
133132
}
134-
sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT.key()));
135-
return sinkConfig;
133+
setMeasurement(config.getString(KEY_MEASUREMENT.key()));
136134
}
137135
}

seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,61 +17,36 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.api.common.PrepareFailException;
23-
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
24-
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2520
import org.apache.seatunnel.api.sink.SinkWriter;
21+
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
22+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2623
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2724
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
28-
import org.apache.seatunnel.common.config.CheckConfigUtil;
29-
import org.apache.seatunnel.common.config.CheckResult;
30-
import org.apache.seatunnel.common.constants.PluginType;
3125
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
3226
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
33-
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
34-
35-
import com.google.auto.service.AutoService;
27+
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
3628

3729
import java.io.IOException;
3830

39-
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
40-
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
31+
public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void>
32+
implements SupportMultiTableSink {
4133

42-
@AutoService(SeaTunnelSink.class)
43-
public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
44-
45-
private Config pluginConfig;
4634
private SeaTunnelRowType seaTunnelRowType;
35+
private SinkConfig sinkConfig;
4736

4837
@Override
4938
public String getPluginName() {
5039
return "InfluxDB";
5140
}
5241

53-
@Override
54-
public void prepare(Config config) throws PrepareFailException {
55-
CheckResult result =
56-
CheckConfigUtil.checkAllExists(config, URL.key(), KEY_MEASUREMENT.key());
57-
if (!result.isSuccess()) {
58-
throw new InfluxdbConnectorException(
59-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
60-
String.format(
61-
"PluginName: %s, PluginType: %s, Message: %s",
62-
getPluginName(), PluginType.SINK, result.getMsg()));
63-
}
64-
this.pluginConfig = config;
65-
}
66-
67-
@Override
68-
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
69-
this.seaTunnelRowType = seaTunnelRowType;
42+
public InfluxDBSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
43+
this.sinkConfig = sinkConfig;
44+
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
7045
}
7146

7247
@Override
7348
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
7449
throws IOException {
75-
return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType);
50+
return new InfluxDBSinkWriter(sinkConfig, seaTunnelRowType);
7651
}
7752
}

seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,20 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
1919

20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2021
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
23+
import org.apache.seatunnel.api.table.connector.TableSink;
2124
import org.apache.seatunnel.api.table.factory.Factory;
2225
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
26+
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
27+
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
2328

2429
import com.google.auto.service.AutoService;
30+
import lombok.extern.slf4j.Slf4j;
31+
32+
import java.util.HashMap;
33+
import java.util.Map;
2534

2635
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS;
2736
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES;
@@ -36,6 +45,7 @@
3645
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS;
3746

3847
@AutoService(Factory.class)
48+
@Slf4j
3949
public class InfluxDBSinkFactory implements TableSinkFactory {
4050

4151
@Override
@@ -46,15 +56,29 @@ public String factoryIdentifier() {
4656
@Override
4757
public OptionRule optionRule() {
4858
return OptionRule.builder()
49-
.required(URL, DATABASES, KEY_MEASUREMENT)
59+
.required(URL, DATABASES)
5060
.bundled(USERNAME, PASSWORD)
5161
.optional(
5262
CONNECT_TIMEOUT_MS,
63+
KEY_MEASUREMENT,
5364
KEY_TAGS,
5465
KEY_TIME,
5566
BATCH_SIZE,
5667
MAX_RETRIES,
5768
RETRY_BACKOFF_MULTIPLIER_MS)
5869
.build();
5970
}
71+
72+
@Override
73+
public TableSink createSink(TableSinkFactoryContext context) {
74+
ReadonlyConfig config = context.getOptions();
75+
CatalogTable catalogTable = context.getCatalogTable();
76+
if (!config.getOptional(KEY_MEASUREMENT).isPresent()) {
77+
Map<String, String> map = config.toMap();
78+
map.put(KEY_MEASUREMENT.key(), catalogTable.getTableId().toTablePath().getFullName());
79+
config = ReadonlyConfig.fromMap(new HashMap<>(map));
80+
}
81+
SinkConfig sinkConfig = new SinkConfig(config.toConfig());
82+
return () -> new InfluxDBSink(sinkConfig, catalogTable);
83+
}
6084
}

seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
20+
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
2221
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2322
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2423
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
24+
import org.apache.seatunnel.common.utils.JsonUtils;
2525
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
2626
import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
2727
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
@@ -44,17 +44,19 @@
4444
import java.util.Optional;
4545

4646
@Slf4j
47-
public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
47+
public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
48+
implements SupportMultiTableSinkWriter {
4849

4950
private final Serializer serializer;
5051
private InfluxDB influxdb;
5152
private final SinkConfig sinkConfig;
5253
private final List<Point> batchList;
5354
private volatile Exception flushException;
5455

55-
public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType)
56+
public InfluxDBSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType)
5657
throws ConnectException {
57-
this.sinkConfig = SinkConfig.loadConfig(pluginConfig);
58+
this.sinkConfig = sinkConfig;
59+
log.info("sinkConfig is {}", JsonUtils.toJsonString(sinkConfig));
5860
this.serializer =
5961
new DefaultSerializer(
6062
seaTunnelRowType,

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
2626
import org.apache.seatunnel.e2e.common.TestResource;
2727
import org.apache.seatunnel.e2e.common.TestSuiteBase;
28+
import org.apache.seatunnel.e2e.common.container.EngineType;
2829
import org.apache.seatunnel.e2e.common.container.TestContainer;
30+
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
2931

3032
import org.influxdb.InfluxDB;
3133
import org.influxdb.dto.BatchPoints;
@@ -51,10 +53,12 @@
5153
import java.net.ConnectException;
5254
import java.time.Duration;
5355
import java.util.ArrayList;
56+
import java.util.Arrays;
5457
import java.util.Date;
5558
import java.util.List;
5659
import java.util.Objects;
5760
import java.util.concurrent.TimeUnit;
61+
import java.util.stream.Collectors;
5862
import java.util.stream.Stream;
5963

6064
@Slf4j
@@ -242,6 +246,64 @@ public void testInfluxdbWithTz(TestContainer container)
242246
}
243247
}
244248

249+
@TestTemplate
250+
@DisabledOnContainer(
251+
value = {},
252+
type = {EngineType.SPARK, EngineType.FLINK},
253+
disabledReason = "Currently SPARK/FLINK do not support multiple table read")
254+
public void testInfluxdbMultipleWrite(TestContainer container)
255+
throws IOException, InterruptedException {
256+
Container.ExecResult execResult =
257+
container.executeJob("/fake_to_infuxdb_with_multipletable.conf");
258+
259+
Assertions.assertEquals(0, execResult.getExitCode());
260+
Assertions.assertAll(
261+
() -> {
262+
Assertions.assertIterableEquals(
263+
Stream.<List<Object>>of(
264+
Arrays.asList(
265+
1627529632356l,
266+
"label_1",
267+
"sink_1",
268+
4.3,
269+
200,
270+
2.5,
271+
2,
272+
5,
273+
true))
274+
.collect(Collectors.toList()),
275+
readData("infulxdb_sink_1"));
276+
},
277+
() -> {
278+
Assertions.assertIterableEquals(
279+
Stream.<List<Object>>of(
280+
Arrays.asList(
281+
1627529632357l,
282+
"label_2",
283+
"sink_2",
284+
4.3,
285+
200,
286+
2.5,
287+
2,
288+
5,
289+
true))
290+
.collect(Collectors.toList()),
291+
readData("infulxdb_sink_2"));
292+
});
293+
}
294+
295+
public List<List<Object>> readData(String tableName) {
296+
String sinkSql =
297+
String.format(
298+
"select time, label, c_string, c_double, c_bigint, c_float,c_int, c_smallint, c_boolean from %s order by time",
299+
tableName);
300+
QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, INFLUXDB_DATABASE));
301+
302+
List<List<Object>> sinkValues =
303+
sinkQueryResult.getResults().get(0).getSeries().get(0).getValues();
304+
return sinkValues;
305+
}
306+
245307
private void initializeInfluxDBClient() throws ConnectException {
246308
InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl);
247309
influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
# You can set engine configuration here
23+
parallelism = 1
24+
job.mode = "BATCH"
25+
}
26+
27+
source {
28+
FakeSource {
29+
tables_configs = [
30+
{
31+
schema = {
32+
table = "infulxdb_sink_1"
33+
fields {
34+
label = STRING
35+
c_string = STRING
36+
c_double = DOUBLE
37+
c_bigint = BIGINT
38+
c_float = FLOAT
39+
c_int = INT
40+
c_smallint = SMALLINT
41+
c_boolean = BOOLEAN
42+
time = BIGINT
43+
}
44+
}
45+
rows = [
46+
{
47+
kind = INSERT
48+
fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356]
49+
}
50+
]
51+
},
52+
{
53+
schema = {
54+
table = "infulxdb_sink_2"
55+
fields {
56+
label = STRING
57+
c_string = STRING
58+
c_double = DOUBLE
59+
c_bigint = BIGINT
60+
c_float = FLOAT
61+
c_int = INT
62+
c_smallint = SMALLINT
63+
c_boolean = BOOLEAN
64+
time = BIGINT
65+
}
66+
}
67+
rows = [
68+
{
69+
kind = INSERT
70+
fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357]
71+
}
72+
]
73+
}
74+
]
75+
}
76+
}
77+
78+
sink {
79+
InfluxDB {
80+
url = "http://influxdb-host:8086"
81+
database = "test"
82+
key_time = "time"
83+
batch_size = 1
84+
}
85+
}

0 commit comments

Comments
 (0)