Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ The example is to show how to send data to a IoTDB server from a Flink job.

* Run `org.apache.iotdb.flink.FlinkTsFileBatchSource.java` to create a tsfile and read it via a flink DataSet job on local mini cluster.
* Run `org.apache.iotdb.flink.FlinkTsFileStreamSource.java` to create a tsfile and read it via a flink DataStream job on local mini cluster.
* Run `org.apache.iotdb.flink.FlinkTsFileBatchSink.java` to write a tsfile via a flink DataSet job on local mini cluster and print the content to stdout.
* Run `org.apache.iotdb.flink.FlinkTsFileStreamSink.java` to write a tsfile via a flink DataStream job on local mini cluster and print the content to stdout.
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.flink;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.iotdb.flink.tsfile.RowTSRecordConverter;
import org.apache.iotdb.flink.tsfile.TSRecordOutputFormat;
import org.apache.iotdb.tsfile.common.constant.QueryConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* The example of writing to TsFile via Flink DataSet API.
*/
public class FlinkTsFileBatchSink {

public static final String DEFAULT_TEMPLATE = "template";

public static void main(String[] arg) throws Exception {
String path = new File("test.tsfile").getAbsolutePath();
new File(path).deleteOnExit();
String[] filedNames = {
QueryConstant.RESERVED_TIME,
"device_1.sensor_1",
"device_1.sensor_2",
"device_1.sensor_3",
"device_2.sensor_1",
"device_2.sensor_2",
"device_2.sensor_3"
};
TypeInformation[] typeInformations = new TypeInformation[] {
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
Schema schema = new Schema();
schema.extendTemplate(
DEFAULT_TEMPLATE, new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.extendTemplate(
DEFAULT_TEMPLATE, new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.extendTemplate(
DEFAULT_TEMPLATE, new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF));
RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo);
TSRecordOutputFormat<Row> outputFormat = new TSRecordOutputFormat<>(schema, converter);

List<Tuple7> data = new ArrayList<>(7);
data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// If the parallelism > 1, flink create a directory and each subtask will create a tsfile under the directory.
env.setParallelism(1);
DataSet<Tuple7> source = env.fromCollection(
data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
source.map(t -> {
Row row = new Row(7);
for (int i = 0; i < 7; i++) {
row.setField(i, t.getField(i));
}
return row;
}).returns(rowTypeInfo).write(outputFormat, path);

env.execute();

List<Path> paths = Arrays.stream(filedNames)
.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
.map(Path::new)
.collect(Collectors.toList());
String[] result = TsFlieUtils.readTsFile(path, paths);
for (String row : result) {
System.out.println(row);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.flink;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.iotdb.flink.tsfile.RowTSRecordConverter;
import org.apache.iotdb.flink.tsfile.TSRecordOutputFormat;
import org.apache.iotdb.tsfile.common.constant.QueryConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* The example of writing to TsFile via Flink DataStream API.
*/
public class FlinkTsFileStreamSink {

public static final String DEFAULT_TEMPLATE = "template";

public static void main(String[] arg) throws Exception {
String path = new File("test.tsfile").getAbsolutePath();
new File(path).deleteOnExit();
String[] filedNames = {
QueryConstant.RESERVED_TIME,
"device_1.sensor_1",
"device_1.sensor_2",
"device_1.sensor_3",
"device_2.sensor_1",
"device_2.sensor_2",
"device_2.sensor_3"
};
TypeInformation[] typeInformations = new TypeInformation[] {
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
Schema schema = new Schema();
schema.extendTemplate(
DEFAULT_TEMPLATE, new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.extendTemplate(
DEFAULT_TEMPLATE, new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.extendTemplate(
DEFAULT_TEMPLATE, new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF));
RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo);
TSRecordOutputFormat<Row> outputFormat = new TSRecordOutputFormat<>(schema, converter);
outputFormat.setOutputFilePath(new org.apache.flink.core.fs.Path(path));

List<Tuple7> data = new ArrayList<>(7);
data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// If the parallelism > 1, flink create a directory and each subtask will create a tsfile under the directory.
env.setParallelism(1);
DataStream<Tuple7> source = env.fromCollection(
data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
source.map(t -> {
Row row = new Row(7);
for (int i = 0; i < 7; i++) {
row.setField(i, t.getField(i));
}
return row;
}).returns(rowTypeInfo).writeUsingOutputFormat(outputFormat);

env.execute();

List<Path> paths = Arrays.stream(filedNames)
.filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
.map(Path::new)
.collect(Collectors.toList());
String[] result = TsFlieUtils.readTsFile(path, paths);
for (String row : result) {
System.out.println(row);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
Expand All @@ -28,6 +34,10 @@
import org.apache.iotdb.tsfile.write.schema.Schema;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* Utils used to prepare source TsFiles for the examples.
Expand Down Expand Up @@ -69,4 +79,20 @@ public static void writeTsFile(String path) {
System.out.println(e.getMessage());
}
}

public static String[] readTsFile(String tsFilePath, List<Path> paths) throws IOException {
QueryExpression expression = QueryExpression.create(paths, null);
TsFileSequenceReader reader = new TsFileSequenceReader(tsFilePath);
ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
QueryDataSet queryDataSet = readTsFile.query(expression);
List<String> result = new ArrayList<>();
while (queryDataSet.hasNext()) {
RowRecord rowRecord = queryDataSet.next();
String row = rowRecord.getFields().stream()
.map(f -> f == null ? "null" : f.getStringValue())
.collect(Collectors.joining(","));
result.add(rowRecord.getTimestamp() + "," + row);
}
return result.toArray(new String[0]);
}
}
86 changes: 85 additions & 1 deletion flink-tsfile-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
## 1. About TsFile-Flink-Connector

TsFile-Flink-Connector implements the support of Flink for external data sources of Tsfile type.
This enables users to read Tsfile by Flink via DataStream/DataSet API.
This enables users to read and write Tsfile by Flink via DataStream/DataSet API.

With this connector, you can
* load a single TsFile or multiple TsFiles(only for DataSet), from either the local file system or hdfs, into Flink
Expand Down Expand Up @@ -91,3 +91,87 @@ for (String s : result) {
}
```

### Example of TSRecordOutputFormat

1. create TSRecordOutputFormat with default RowTSRecordConverter.

```java
String[] filedNames = {
QueryConstant.RESERVED_TIME,
"device_1.sensor_1",
"device_1.sensor_2",
"device_1.sensor_3",
"device_2.sensor_1",
"device_2.sensor_2",
"device_2.sensor_3"
};
TypeInformation[] typeInformations = new TypeInformation[] {
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG,
Types.LONG
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
Schema schema = new Schema();
schema.extendTemplate("template", new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.extendTemplate("template", new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.extendTemplate("template", new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF));
RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo);
TSRecordOutputFormat<Row> outputFormat = new TSRecordOutputFormat<>(schema, converter);
```

2. write data via the output format:

DataStream:

```java
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
List<Tuple7> data = new ArrayList<>(7);
data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
outputFormat.setOutputFilePath(new org.apache.flink.core.fs.Path(path));
DataStream<Tuple7> source = senv.fromCollection(
data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
source.map(t -> {
Row row = new Row(7);
for (int i = 0; i < 7; i++) {
row.setField(i, t.getField(i));
}
return row;
}).returns(rowTypeInfo).writeUsingOutputFormat(outputFormat);
senv.execute();
```

DataSet:

```java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Tuple7> data = new ArrayList<>(7);
data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
DataSet<Tuple7> source = env.fromCollection(
data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
source.map(t -> {
Row row = new Row(7);
for (int i = 0; i < 7; i++) {
row.setField(i, t.getField(i));
}
return row;
}).returns(rowTypeInfo).write(outputFormat, path);
env.execute();
```
Loading