diff --git a/example/flink/README.md b/example/flink/README.md index 50599469fb677..38a6d337280cd 100644 --- a/example/flink/README.md +++ b/example/flink/README.md @@ -36,3 +36,5 @@ The example is to show how to send data to a IoTDB server from a Flink job. * Run `org.apache.iotdb.flink.FlinkTsFileBatchSource.java` to create a tsfile and read it via a flink DataSet job on local mini cluster. * Run `org.apache.iotdb.flink.FlinkTsFileStreamSource.java` to create a tsfile and read it via a flink DataStream job on local mini cluster. +* Run `org.apache.iotdb.flink.FlinkTsFileBatchSink.java` to write a tsfile via a flink DataSet job on local mini cluster and print the content to stdout. +* Run `org.apache.iotdb.flink.FlinkTsFileStreamSink.java` to write a tsfile via a flink DataStream job on local mini cluster and print the content to stdout. diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSink.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSink.java new file mode 100644 index 0000000000000..2394824869a56 --- /dev/null +++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileBatchSink.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.flink; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.iotdb.flink.tsfile.RowTSRecordConverter; +import org.apache.iotdb.flink.tsfile.TSRecordOutputFormat; +import org.apache.iotdb.tsfile.common.constant.QueryConstant; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.Schema; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * The example of writing to TsFile via Flink DataSet API. + */ +public class FlinkTsFileBatchSink { + + public static final String DEFAULT_TEMPLATE = "template"; + + public static void main(String[] arg) throws Exception { + String path = new File("test.tsfile").getAbsolutePath(); + new File(path).deleteOnExit(); + String[] filedNames = { + QueryConstant.RESERVED_TIME, + "device_1.sensor_1", + "device_1.sensor_2", + "device_1.sensor_3", + "device_2.sensor_1", + "device_2.sensor_2", + "device_2.sensor_3" + }; + TypeInformation[] typeInformations = new TypeInformation[] { + Types.LONG, + Types.LONG, + Types.LONG, + Types.LONG, + Types.LONG, + Types.LONG, + Types.LONG + }; + RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames); + Schema schema = new Schema(); + schema.extendTemplate( + DEFAULT_TEMPLATE, new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF)); + schema.extendTemplate( + DEFAULT_TEMPLATE, new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF)); + schema.extendTemplate( + DEFAULT_TEMPLATE, new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF)); + RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo); + TSRecordOutputFormat outputFormat = new TSRecordOutputFormat<>(schema, converter); + + List data = new ArrayList<>(7); + data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L)); + data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L)); + data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L)); + data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L)); + data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L)); + data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L)); + data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L)); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + // If the parallelism > 1, flink create a directory and each subtask will create a tsfile under the directory. + env.setParallelism(1); + DataSet source = env.fromCollection( + data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG)); + source.map(t -> { + Row row = new Row(7); + for (int i = 0; i < 7; i++) { + row.setField(i, t.getField(i)); + } + return row; + }).returns(rowTypeInfo).write(outputFormat, path); + + env.execute(); + + List paths = Arrays.stream(filedNames) + .filter(s -> !s.equals(QueryConstant.RESERVED_TIME)) + .map(Path::new) + .collect(Collectors.toList()); + String[] result = TsFlieUtils.readTsFile(path, paths); + for (String row : result) { + System.out.println(row); + } + } +} diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSink.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSink.java new file mode 100644 index 0000000000000..cdf13901db1fd --- /dev/null +++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkTsFileStreamSink.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iotdb.flink; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.types.Row; +import org.apache.iotdb.flink.tsfile.RowTSRecordConverter; +import org.apache.iotdb.flink.tsfile.TSRecordOutputFormat; +import org.apache.iotdb.tsfile.common.constant.QueryConstant; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.Schema; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * The example of writing to TsFile via Flink DataStream API. + */ +public class FlinkTsFileStreamSink { + + public static final String DEFAULT_TEMPLATE = "template"; + + public static void main(String[] arg) throws Exception { + String path = new File("test.tsfile").getAbsolutePath(); + new File(path).deleteOnExit(); + String[] filedNames = { + QueryConstant.RESERVED_TIME, + "device_1.sensor_1", + "device_1.sensor_2", + "device_1.sensor_3", + "device_2.sensor_1", + "device_2.sensor_2", + "device_2.sensor_3" + }; + TypeInformation[] typeInformations = new TypeInformation[] { + Types.LONG, + Types.LONG, + Types.LONG, + Types.LONG, + Types.LONG, + Types.LONG, + Types.LONG + }; + RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames); + Schema schema = new Schema(); + schema.extendTemplate( + DEFAULT_TEMPLATE, new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF)); + schema.extendTemplate( + DEFAULT_TEMPLATE, new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF)); + schema.extendTemplate( + DEFAULT_TEMPLATE, new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF)); + RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo); + TSRecordOutputFormat outputFormat = new TSRecordOutputFormat<>(schema, converter); + outputFormat.setOutputFilePath(new org.apache.flink.core.fs.Path(path)); + + List data = new ArrayList<>(7); + data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L)); + data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L)); + data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L)); + data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L)); + data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L)); + data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L)); + data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // If the parallelism > 1, flink create a directory and each subtask will create a tsfile under the directory. + env.setParallelism(1); + DataStream source = env.fromCollection( + data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG)); + source.map(t -> { + Row row = new Row(7); + for (int i = 0; i < 7; i++) { + row.setField(i, t.getField(i)); + } + return row; + }).returns(rowTypeInfo).writeUsingOutputFormat(outputFormat); + + env.execute(); + + List paths = Arrays.stream(filedNames) + .filter(s -> !s.equals(QueryConstant.RESERVED_TIME)) + .map(Path::new) + .collect(Collectors.toList()); + String[] result = TsFlieUtils.readTsFile(path, paths); + for (String row : result) { + System.out.println(row); + } + } +} diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/TsFlieUtils.java b/example/flink/src/main/java/org/apache/iotdb/flink/TsFlieUtils.java index 5b1ffba85b8db..f97b683583a5b 100644 --- a/example/flink/src/main/java/org/apache/iotdb/flink/TsFlieUtils.java +++ b/example/flink/src/main/java/org/apache/iotdb/flink/TsFlieUtils.java @@ -20,6 +20,12 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.read.ReadOnlyTsFile; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.expression.QueryExpression; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.write.TsFileWriter; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; @@ -28,6 +34,10 @@ import org.apache.iotdb.tsfile.write.schema.Schema; import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; /** * Utils used to prepare source TsFiles for the examples. @@ -69,4 +79,20 @@ public static void writeTsFile(String path) { System.out.println(e.getMessage()); } } + + public static String[] readTsFile(String tsFilePath, List paths) throws IOException { + QueryExpression expression = QueryExpression.create(paths, null); + TsFileSequenceReader reader = new TsFileSequenceReader(tsFilePath); + ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader); + QueryDataSet queryDataSet = readTsFile.query(expression); + List result = new ArrayList<>(); + while (queryDataSet.hasNext()) { + RowRecord rowRecord = queryDataSet.next(); + String row = rowRecord.getFields().stream() + .map(f -> f == null ? "null" : f.getStringValue()) + .collect(Collectors.joining(",")); + result.add(rowRecord.getTimestamp() + "," + row); + } + return result.toArray(new String[0]); + } } diff --git a/flink-tsfile-connector/README.md b/flink-tsfile-connector/README.md index a571de70d3983..24da260f01789 100644 --- a/flink-tsfile-connector/README.md +++ b/flink-tsfile-connector/README.md @@ -23,7 +23,7 @@ ## 1. About TsFile-Flink-Connector TsFile-Flink-Connector implements the support of Flink for external data sources of Tsfile type. -This enables users to read Tsfile by Flink via DataStream/DataSet API. +This enables users to read and write Tsfile by Flink via DataStream/DataSet API. With this connector, you can * load a single TsFile or multiple TsFiles(only for DataSet), from either the local file system or hdfs, into Flink @@ -91,3 +91,87 @@ for (String s : result) { } ``` +### Example of TSRecordOutputFormat + +1. create TSRecordOutputFormat with default RowTSRecordConverter. + +```java +String[] filedNames = { + QueryConstant.RESERVED_TIME, + "device_1.sensor_1", + "device_1.sensor_2", + "device_1.sensor_3", + "device_2.sensor_1", + "device_2.sensor_2", + "device_2.sensor_3" +}; +TypeInformation[] typeInformations = new TypeInformation[] { + Types.LONG, + Types.LONG, + Types.LONG, + Types.LONG, + Types.LONG, + Types.LONG, + Types.LONG +}; +RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames); +Schema schema = new Schema(); +schema.extendTemplate("template", new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF)); +schema.extendTemplate("template", new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF)); +schema.extendTemplate("template", new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF)); +RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo); +TSRecordOutputFormat outputFormat = new TSRecordOutputFormat<>(schema, converter); +``` + +2. write data via the output format: + +DataStream: + +```java +StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); +senv.setParallelism(1); +List data = new ArrayList<>(7); +data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L)); +data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L)); +data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L)); +data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L)); +data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L)); +data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L)); +data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L)); +outputFormat.setOutputFilePath(new org.apache.flink.core.fs.Path(path)); +DataStream source = senv.fromCollection( + data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG)); +source.map(t -> { + Row row = new Row(7); + for (int i = 0; i < 7; i++) { + row.setField(i, t.getField(i)); + } + return row; +}).returns(rowTypeInfo).writeUsingOutputFormat(outputFormat); +senv.execute(); +``` + +DataSet: + +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +List data = new ArrayList<>(7); +data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L)); +data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L)); +data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L)); +data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L)); +data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L)); +data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L)); +data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L)); +DataSet source = env.fromCollection( + data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG)); +source.map(t -> { + Row row = new Row(7); + for (int i = 0; i < 7; i++) { + row.setField(i, t.getField(i)); + } + return row; +}).returns(rowTypeInfo).write(outputFormat, path); +env.execute(); +``` diff --git a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java new file mode 100644 index 0000000000000..cf698af88eefc --- /dev/null +++ b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flink.tsfile; + +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.apache.iotdb.tsfile.common.constant.QueryConstant; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.BooleanDataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.StringDataPoint; +import org.apache.iotdb.tsfile.write.schema.Schema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * The converter that convert a Row object to multiple TSRecord objects. + */ +public class RowTSRecordConverter implements TSRecordConverter { + + private RowTypeInfo rowTypeInfo; + private transient TSRecord[] outputTemplate; + private transient int timeIndex = -1; + private transient int[] tsRecordIndexMapping; + private transient int[] dataPointIndexMapping; + private transient TSRecord[] reuse; + + public RowTSRecordConverter(RowTypeInfo rowTypeInfo) { + this.rowTypeInfo = rowTypeInfo; + } + + @Override + public void open(Schema schema) throws IOException { + this.tsRecordIndexMapping = new int[rowTypeInfo.getArity()]; + this.dataPointIndexMapping = new int[rowTypeInfo.getArity()]; + List outputTemplateList = new ArrayList<>(); + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + String fieldName = rowTypeInfo.getFieldNames()[i]; + if (QueryConstant.RESERVED_TIME.equals(fieldName)) { + timeIndex = i; + tsRecordIndexMapping[i] = -1; + dataPointIndexMapping[i] = -1; + continue; + } + String deviceId = fieldName.substring(0, fieldName.lastIndexOf(".")); + String measurementId = fieldName.substring(fieldName.lastIndexOf(".") + 1); + int tsRecordIndex = outputTemplateList.stream() + .map(t -> t.deviceId).collect(Collectors.toList()).indexOf(deviceId); + if (tsRecordIndex < 0) { + outputTemplateList.add(new TSRecord(0, deviceId)); + tsRecordIndex = outputTemplateList.size() - 1; + } + tsRecordIndexMapping[i] = tsRecordIndex; + TSRecord tsRecord = outputTemplateList.get(tsRecordIndex); + Class typeClass = rowTypeInfo.getFieldTypes()[i].getTypeClass(); + if (typeClass == Boolean.class || typeClass == boolean.class) { + tsRecord.addTuple(new BooleanDataPoint(measurementId, false)); + } else if (typeClass == Integer.class || typeClass == int.class) { + tsRecord.addTuple(new IntDataPoint(measurementId, 0)); + } else if (typeClass == Long.class || typeClass == long.class) { + tsRecord.addTuple(new LongDataPoint(measurementId, 0)); + } else if (typeClass == Float.class || typeClass == float.class) { + tsRecord.addTuple(new FloatDataPoint(measurementId, 0)); + } else if (typeClass == Double.class || typeClass == double.class) { + tsRecord.addTuple(new DoubleDataPoint(measurementId, 0)); + } else if (typeClass == String.class) { + tsRecord.addTuple(new StringDataPoint(measurementId, null)); + } else { + throw new UnSupportedDataTypeException(typeClass.toString()); + } + dataPointIndexMapping[i] = tsRecord.dataPointList.size() - 1; + } + outputTemplate = outputTemplateList.toArray(new TSRecord[0]); + + reuse = new TSRecord[outputTemplate.length]; + for (int i = 0; i < outputTemplate.length; i++) { + reuse[i] = new TSRecord(0, outputTemplate[i].deviceId); + } + } + + @Override + public void convert(Row input, Collector collector) throws IOException { + long timestamp = (long) input.getField(timeIndex); + for (TSRecord tsRecord : reuse) { + tsRecord.dataPointList.clear(); + } + for (int i = 0; i < input.getArity(); i++) { + if (i == timeIndex) { + continue; + } + TSRecord templateRecord = outputTemplate[tsRecordIndexMapping[i]]; + DataPoint templateDataPoint = templateRecord.dataPointList.get(dataPointIndexMapping[i]); + Object o = input.getField(i); + if (o != null) { + switch (templateDataPoint.getType()) { + case BOOLEAN: + templateDataPoint.setBoolean((Boolean) o); + break; + case INT32: + templateDataPoint.setInteger((Integer) o); + break; + case INT64: + templateDataPoint.setLong((Long) o); + break; + case FLOAT: + templateDataPoint.setFloat((Float) o); + break; + case DOUBLE: + templateDataPoint.setDouble((Double) o); + break; + case TEXT: + templateDataPoint.setString(Binary.valueOf((String) o)); + break; + default: + templateDataPoint.setString(Binary.valueOf(o.toString())); + } + reuse[tsRecordIndexMapping[i]].addTuple(templateDataPoint); + } + } + for (TSRecord tsRecord : reuse) { + if (tsRecord.dataPointList.size() > 0) { + tsRecord.setTime(timestamp); + collector.collect(tsRecord); + } + } + } + + @Override + public void close() throws IOException { + outputTemplate = null; + timeIndex = -1; + tsRecordIndexMapping = null; + dataPointIndexMapping = null; + } +} diff --git a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordConverter.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordConverter.java new file mode 100644 index 0000000000000..97810d95564da --- /dev/null +++ b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordConverter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flink.tsfile; + +import org.apache.flink.util.Collector; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.schema.Schema; + +import java.io.IOException; +import java.io.Serializable; + +/** + * The converter describes how to turn a data object into multiple TSRecord objects, which is required by the + * {@link TSRecordOutputFormat}. + * + * @param The type of the upstream data. + */ +public interface TSRecordConverter extends Serializable { + + /** + * Opens current converter. + * + * @param schema The schema of the TSRecord. + */ + void open(Schema schema) throws IOException; + + /** + * Converts the input data into one or multiple TSRecords. The collector in param list is used to collect the + * output. + * + * When this method is called, the converter is guaranteed to be opened. + * + * @param input The input data. + * @param collector The collector used to collect the output. + */ + void convert(T input, Collector collector) throws IOException; + + /** + * Method that marks the end of the life-cycle of this converter. + * + * When this method is called, the converter is guaranteed to be opened. + */ + void close() throws IOException; +} diff --git a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordOutputFormat.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordOutputFormat.java new file mode 100644 index 0000000000000..4bbda7cd29930 --- /dev/null +++ b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TSRecordOutputFormat.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flink.tsfile; + +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.schema.Schema; + +import java.io.IOException; + +/** + * Output format that writes TsFiles by {@link TSRecord}. Users need to provide a {@link TSRecordConverter} used to + * convert the upstream data to {@link TSRecord}. + * + * @param The input type of this output format. + */ +public class TSRecordOutputFormat extends TsFileOutputFormat { + + private final TSRecordConverter converter; + + private transient TSRecordCollector tsRecordCollector = null; + + public TSRecordOutputFormat(String path, Schema schema, TSRecordConverter converter) { + this(path, schema, converter, null); + } + + public TSRecordOutputFormat(Schema schema, TSRecordConverter converter) { + super(null, schema, null); + this.converter = converter; + } + + public TSRecordOutputFormat(String path, Schema schema, TSRecordConverter converter, TSFileConfig config) { + super(path, schema, config); + this.converter = converter; + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + super.open(taskNumber, numTasks); + converter.open(schema); + tsRecordCollector = new TSRecordCollector(); + } + + @Override + public void close() throws IOException { + converter.close(); + super.close(); + } + + @Override + public void writeRecord(T t) throws IOException { + try { + converter.convert(t, tsRecordCollector); + } catch (FlinkRuntimeException e) { + throw new IOException(e.getCause()); + } + } + + private class TSRecordCollector implements Collector { + + @Override + public void collect(TSRecord tsRecord) { + try { + writer.write(tsRecord); + } catch (IOException | WriteProcessException e) { + throw new FlinkRuntimeException(e); + } + } + + @Override + public void close() { + + } + } + + public TSRecordConverter getConverter() { + return converter; + } +} diff --git a/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java new file mode 100644 index 0000000000000..2d9091300bce0 --- /dev/null +++ b/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flink.tsfile; + +import org.apache.flink.api.common.io.FileOutputFormat; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.iotdb.flink.tsfile.util.TSFileConfigUtil; +import org.apache.iotdb.hadoop.fileSystem.HDFSOutput; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.write.TsFileWriter; +import org.apache.iotdb.tsfile.write.schema.Schema; +import org.apache.iotdb.tsfile.write.writer.LocalTsFileOutput; +import org.apache.iotdb.tsfile.write.writer.TsFileOutput; + +import javax.annotation.Nullable; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Optional; + +/** + * The abstract base class of the output formats which write data to TsFile. + * @param The input data type. + */ +public abstract class TsFileOutputFormat extends FileOutputFormat { + + protected Schema schema; + @Nullable + protected TSFileConfig config; + + protected transient Configuration hadoopConf = null; + protected transient TsFileWriter writer = null; + + public TsFileOutputFormat(String path, Schema schema, TSFileConfig config) { + super(path == null ? null : new Path(path)); + this.schema = Preconditions.checkNotNull(schema); + this.config = config; + } + + @Override + public void configure(org.apache.flink.configuration.Configuration flinkConfiguration) { + super.configure(flinkConfiguration); + hadoopConf = HadoopUtils.getHadoopConfiguration(flinkConfiguration); + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + super.open(taskNumber, numTasks); + if (config != null) { + TSFileConfigUtil.setGlobalTSFileConfig(config); + } + // Use TsFile API to write instead of FSDataOutputStream. + this.stream.close(); + Path actualFilePath = getAcutalFilePath(); + TsFileOutput out; + try { + if (actualFilePath.getFileSystem().isDistributedFS()) { + // HDFS + out = new HDFSOutput( + new org.apache.hadoop.fs.Path(new URI(actualFilePath.getPath())), hadoopConf, true); + } else { + // Local File System + out = new LocalTsFileOutput(new FileOutputStream(actualFilePath.getPath())); + } + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + writer = new TsFileWriter(out, schema); + } + + @Override + public void close() throws IOException { + super.close(); + writer.close(); + writer = null; + } + + @Override + protected String getDirectoryFileName(int taskNumber) { + return super.getDirectoryFileName(taskNumber) + ".tsfile"; + } + + protected Path getAcutalFilePath() { + try { + Field field = FileOutputFormat.class.getDeclaredField("actualFilePath"); + field.setAccessible(true); + return (Path) field.get(this); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Get actual file path failed!", e); + } + } + + public Schema getSchema() { + return schema; + } + + public Optional getConfig() { + return Optional.ofNullable(config); + } +} diff --git a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTSRecordOutputFormatITCase.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTSRecordOutputFormatITCase.java new file mode 100644 index 0000000000000..3c7182e89cf96 --- /dev/null +++ b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTSRecordOutputFormatITCase.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flink.tsfile; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.types.Row; +import org.junit.Test; + +import java.io.File; + +import static org.junit.Assert.assertArrayEquals; + +/** + * ITCases for TSRecordOutputFormat. + */ +public class RowTSRecordOutputFormatITCase extends RowTsFileOutputFormatTestBase { + + @Test + public void testOutputFormat() throws Exception { + DataSet source = prepareDataSource(); + String outputFilePath = tmpDir + File.separator + "test.tsfile"; + TSRecordOutputFormat outputFormat = prepareTSRecordOutputFormat(outputFilePath); + + source.output(outputFormat).setParallelism(1); + env.execute(); + + String[] actual = readTsFile(outputFilePath, paths); + String[] expected = { + "1,1.2,20,null,2.3,11,19", + "2,null,20,50,25.4,10,21", + "3,1.4,21,null,null,null,null", + "4,1.2,20,51,null,null,null", + "6,7.2,10,11,null,null,null", + "7,6.2,20,21,null,null,null", + "8,9.2,30,31,null,null,null" + }; + assertArrayEquals(actual, expected); + } +} diff --git a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTSRecordOutputFormatTest.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTSRecordOutputFormatTest.java new file mode 100644 index 0000000000000..619654092bd94 --- /dev/null +++ b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTSRecordOutputFormatTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flink.tsfile; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.types.Row; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for TSRecordOutputFormat + */ +public class RowTSRecordOutputFormatTest extends RowTsFileOutputFormatTestBase { + + @Test + public void testWriteData() throws IOException { + String outputDirPath = tmpDir + File.separator + "testOutput"; + new File(outputDirPath).mkdirs(); + TSRecordOutputFormat outputFormat = prepareTSRecordOutputFormat(outputDirPath); + + try { + outputFormat.configure(new Configuration()); + outputFormat.open(0, 2); + List data = prepareData(); + for (Row row : data) { + outputFormat.writeRecord(row); + } + } finally { + outputFormat.close(); + } + + String[] actual = readTsFile(outputDirPath + File.separator + "1.tsfile", paths); + String[] expected = { + "1,1.2,20,null,2.3,11,19", + "2,null,20,50,25.4,10,21", + "3,1.4,21,null,null,null,null", + "4,1.2,20,51,null,null,null", + "6,7.2,10,11,null,null,null", + "7,6.2,20,21,null,null,null", + "8,9.2,30,31,null,null,null" + }; + assertArrayEquals(actual, expected); + } + + @Test + public void testGetter() { + String outputFilePath = tmpDir + File.separator + "test.tsfile"; + TSRecordOutputFormat outputFormat = prepareTSRecordOutputFormat(outputFilePath); + + assertEquals(rowTSRecordConverter, outputFormat.getConverter()); + assertEquals(schema, outputFormat.getSchema()); + assertEquals(config, outputFormat.getConfig().get()); + } +} diff --git a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileConnectorTestBase.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileConnectorTestBase.java new file mode 100644 index 0000000000000..0ceb55375219c --- /dev/null +++ b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileConnectorTestBase.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flink.tsfile; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; + +import org.apache.flink.util.FileUtils; +import org.apache.iotdb.flink.util.TsFileWriteUtil; + +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.constant.QueryConstant; +import org.apache.iotdb.tsfile.read.common.Path; +import org.junit.After; +import org.junit.Before; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Base class of the TsFile connector tests. + */ +public abstract class RowTsFileConnectorTestBase { + + protected String tmpDir; + protected TSFileConfig config = new TSFileConfig(); + protected String[] filedNames = { + QueryConstant.RESERVED_TIME, + "device_1.sensor_1", + "device_1.sensor_2", + "device_1.sensor_3", + "device_2.sensor_1", + "device_2.sensor_2", + "device_2.sensor_3" + }; + protected TypeInformation[] typeInformations = new TypeInformation[] { + Types.LONG, + Types.FLOAT, + Types.INT, + Types.INT, + Types.FLOAT, + Types.INT, + Types.INT + }; + protected List paths = Arrays.stream(filedNames) + .filter(s -> !s.equals(QueryConstant.RESERVED_TIME)) + .map(Path::new) + .collect(Collectors.toList()); + protected RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames); + + @Before + public void prepareTempDirectory() throws Exception { + tmpDir = String.join( + File.separator, + TsFileWriteUtil.TMP_DIR, + UUID.randomUUID().toString()); + new File(tmpDir).mkdirs(); + config.setBatchSize(500); + } + + @After + public void cleanTempDirectory() { + File tmpDirFile = new File(tmpDir); + FileUtils.deleteDirectoryQuietly(tmpDirFile); + } +} diff --git a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java index 9da84f73b0e45..48f50089c2752 100644 --- a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java +++ b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileInputFormatTestBase.java @@ -19,46 +19,25 @@ package org.apache.iotdb.flink.tsfile; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.typeutils.RowTypeInfo; - import org.apache.flink.types.Row; import org.apache.iotdb.flink.util.TsFileWriteUtil; - -import org.apache.iotdb.tsfile.common.conf.TSFileConfig; -import org.apache.iotdb.tsfile.common.constant.QueryConstant; -import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.QueryExpression; -import org.junit.After; import org.junit.Before; import java.io.File; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; -import java.util.stream.Collectors; /** - * Base class of the TsFileInputFormat tests. + * Base class for TsFileInputFormat tests. */ -public abstract class RowTsFileInputFormatTestBase { +public abstract class RowTsFileInputFormatTestBase extends RowTsFileConnectorTestBase { - protected String tmpDir; protected String sourceTsFilePath1; protected String sourceTsFilePath2; - protected RowTypeInfo rowTypeInfo; - protected TSFileConfig config; - protected RowRowRecordParser parser; - protected QueryExpression queryExpression; + protected RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, paths); + protected QueryExpression queryExpression = QueryExpression.create(paths, null); @Before public void prepareSourceTsFile() throws Exception { - tmpDir = String.join( - File.separator, - TsFileWriteUtil.TMP_DIR, - UUID.randomUUID().toString()); - new File(tmpDir).mkdirs(); sourceTsFilePath1 = String.join( File.separator, tmpDir, "source1.tsfile"); @@ -69,50 +48,7 @@ public void prepareSourceTsFile() throws Exception { TsFileWriteUtil.create2(sourceTsFilePath2); } - @After - public void removeSourceTsFile() { - File sourceTsFile1 = new File(sourceTsFilePath1); - if (sourceTsFile1.exists()) { - sourceTsFile1.delete(); - } - File sourceTsFile2 = new File(sourceTsFilePath2); - if (sourceTsFile2.exists()) { - sourceTsFile2.delete(); - } - File tmpDirFile = new File(tmpDir); - if (tmpDirFile.exists()) { - tmpDirFile.delete(); - } - } - protected TsFileInputFormat prepareInputFormat(String filePath) { - String[] filedNames = { - QueryConstant.RESERVED_TIME, - "device_1.sensor_1", - "device_1.sensor_2", - "device_1.sensor_3", - "device_2.sensor_1", - "device_2.sensor_2", - "device_2.sensor_3" - }; - TypeInformation[] typeInformations = new TypeInformation[] { - Types.LONG, - Types.FLOAT, - Types.INT, - Types.INT, - Types.FLOAT, - Types.INT, - Types.INT - }; - List paths = Arrays.stream(filedNames) - .filter(s -> !s.equals(QueryConstant.RESERVED_TIME)) - .map(Path::new) - .collect(Collectors.toList()); - config = new TSFileConfig(); - config.setBatchSize(500); - rowTypeInfo = new RowTypeInfo(typeInformations, filedNames); - queryExpression = QueryExpression.create(paths, null); - parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries()); return new TsFileInputFormat<>(filePath, queryExpression, parser, config); } } diff --git a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileOutputFormatTestBase.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileOutputFormatTestBase.java new file mode 100644 index 0000000000000..4c993df9f211e --- /dev/null +++ b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/RowTsFileOutputFormatTestBase.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flink.tsfile; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.types.Row; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.ReadOnlyTsFile; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.expression.QueryExpression; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.Schema; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.iotdb.flink.util.TsFileWriteUtil.DEFAULT_TEMPLATE; + +/** + * Base class for TsFileOutputFormat tests. + */ +public abstract class RowTsFileOutputFormatTestBase extends RowTsFileConnectorTestBase { + + protected ExecutionEnvironment env; + protected RowTSRecordConverter rowTSRecordConverter; + protected Schema schema; + + @Before + public void prepareEnv() { + env = ExecutionEnvironment.getExecutionEnvironment(); + } + + protected TSRecordOutputFormat prepareTSRecordOutputFormat(String path) { + schema = new Schema(); + schema.extendTemplate( + DEFAULT_TEMPLATE, new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE)); + schema.extendTemplate( + DEFAULT_TEMPLATE, new MeasurementSchema("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF)); + schema.extendTemplate( + DEFAULT_TEMPLATE, new MeasurementSchema("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF)); + rowTSRecordConverter = new RowTSRecordConverter(rowTypeInfo); + return new TSRecordOutputFormat<>(path, schema, rowTSRecordConverter, config); + } + + protected List prepareData() { + List tuples = new ArrayList<>(7); + tuples.add(new Tuple7(1L, 1.2f, 20, null, 2.3f, 11, 19)); + tuples.add(new Tuple7(2L, null, 20, 50, 25.4f, 10, 21)); + tuples.add(new Tuple7(3L, 1.4f, 21, null, null, null, null)); + tuples.add(new Tuple7(4L, 1.2f, 20, 51, null, null, null)); + tuples.add(new Tuple7(6L, 7.2f, 10, 11, null, null, null)); + tuples.add(new Tuple7(7L, 6.2f, 20, 21, null, null, null)); + tuples.add(new Tuple7(8L, 9.2f, 30, 31, null, null, null)); + + return tuples.stream().map(t -> { + Row row = new Row(7); + for (int i = 0; i < 7; i++) { + row.setField(i, t.getField(i)); + } + return row; + }).collect(Collectors.toList()); + } + + protected DataSet prepareDataSource() { + List input = new ArrayList<>(7); + input.add(new Tuple7(1L, 1.2f, 20, null, 2.3f, 11, 19)); + input.add(new Tuple7(2L, null, 20, 50, 25.4f, 10, 21)); + input.add(new Tuple7(3L, 1.4f, 21, null, null, null, null)); + input.add(new Tuple7(4L, 1.2f, 20, 51, null, null, null)); + input.add(new Tuple7(6L, 7.2f, 10, 11, null, null, null)); + input.add(new Tuple7(7L, 6.2f, 20, 21, null, null, null)); + input.add(new Tuple7(8L, 9.2f, 30, 31, null, null, null)); + return env.fromCollection(prepareData(), rowTypeInfo); + } + + protected String[] readTsFile(String tsFilePath, List paths) throws IOException { + QueryExpression expression = QueryExpression.create(paths, null); + TsFileSequenceReader reader = new TsFileSequenceReader(tsFilePath); + ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader); + QueryDataSet queryDataSet = readTsFile.query(expression); + List result = new ArrayList<>(); + while (queryDataSet.hasNext()) { + RowRecord rowRecord = queryDataSet.next(); + String row = rowRecord.getFields().stream() + .map(f -> f == null ? "null" : f.getStringValue()) + .collect(Collectors.joining(",")); + result.add(rowRecord.getTimestamp() + "," + row); + } + return result.toArray(new String[0]); + } +} diff --git a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java index 21d9ab774b095..7c3b8e8980f46 100644 --- a/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java +++ b/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TsFileWriteUtil.java @@ -36,7 +36,7 @@ public class TsFileWriteUtil { public static final String TMP_DIR = "target"; - private static final String DEFAULT_TEMPLATE = "template"; + public static final String DEFAULT_TEMPLATE = "template"; public static void create1(String tsfilePath) throws Exception { File f = new File(tsfilePath); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/Schema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/Schema.java index a1cb556a02693..128b64f9f9462 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/Schema.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/Schema.java @@ -20,6 +20,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; + +import java.io.Serializable; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -28,7 +30,7 @@ * The schema of timeseries that exist in this file. The deviceTemplates is a simplified manner to * batch create schema of timeseries. */ -public class Schema { +public class Schema implements Serializable { /** * Path (device + measurement) -> measurementSchema By default, use the LinkedHashMap to store the