Skip to content
Permalink
Browse files
[IoTDB-1499] Remove series registeration using IoTDBSink -> master (#…
…3574)

[IoTDB-1499] Remove series registeration using IoTDBSink -> master
  • Loading branch information
yuyuankang committed Jul 27, 2021
1 parent 9437339 commit 47fa594403d5bb0c79d8414886b5a9fe6a24b778
Showing 3 changed files with 18 additions and 22 deletions.
@@ -35,6 +35,8 @@ This example shows a case that sends data to a IoTDB server from a Flink job:
- A simulated Source `SensorSource` generates data points per 1 second.
- Flink uses `IoTDBSink` to consume the generated data points and write the data into IoTDB.

It is noteworthy that to use IoTDBSink, schema auto-creation in IoTDB should be enabled.

```java
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -59,7 +61,6 @@ public class FlinkIoTDBSink {
options.setPort(6667);
options.setUser("root");
options.setPassword("root");
options.setStorageGroup("root.sg");
// If the server enables auto_create_schema, then we do not need to register all timeseries
// here.
@@ -36,19 +36,25 @@ public static void main(String[] args) throws Exception {
// run the flink job on local mini cluster
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

IoTDBSinkOptions options = new IoTDBSinkOptions();
options.setHost("127.0.0.1");
options.setPort(6667);
options.setUser("root");
options.setPassword("root");
options.setStorageGroup("root.sg");
String host = "127.0.0.1";
int port = 6667;
String user = "root";
String password = "root";

// If the server enables auto_create_schema, then we do not need to register all timeseries
// here.
options.setTimeseriesOptionList(
Lists.newArrayList(
new IoTDBSinkOptions.TimeseriesOption(
"root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY)));
IoTDBSinkOptions options =
new IoTDBSinkOptions(
host,
port,
user,
password,
Lists.newArrayList(
new IoTDBSinkOptions.TimeseriesOption(
"root.sg.d1.s1",
TSDataType.DOUBLE,
TSEncoding.GORILLA,
CompressionType.SNAPPY)));

IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
IoTDBSink ioTDBSink =
@@ -28,7 +28,6 @@
/** IoTDBOptions describes the configuration related information for IoTDB and timeseries. */
public class IoTDBSinkOptions extends IoTDBOptions {

private String storageGroup;
private List<TimeseriesOption> timeseriesOptionList;

public IoTDBSinkOptions() {}
@@ -38,21 +37,11 @@ public IoTDBSinkOptions(
int port,
String user,
String password,
String storageGroup,
List<TimeseriesOption> timeseriesOptionList) {
super(host, port, user, password);
this.storageGroup = storageGroup;
this.timeseriesOptionList = timeseriesOptionList;
}

public String getStorageGroup() {
return storageGroup;
}

public void setStorageGroup(String storageGroup) {
this.storageGroup = storageGroup;
}

public List<TimeseriesOption> getTimeseriesOptionList() {
return timeseriesOptionList;
}

0 comments on commit 47fa594

Please sign in to comment.