Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor TsFile #736

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -31,8 +31,8 @@
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -59,12 +59,13 @@ public static void main(String[] args)

// add measurements into file schema (all with INT64 data type)
for (int i = 0; i < 2; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new org.apache.iotdb.tsfile.read.common.Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
}

for (int i = 2; i < sensorNum; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.DOUBLE, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new org.apache.iotdb.tsfile.read.common.Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.DOUBLE, TSEncoding.TS_2DIFF));
}
TSFOutputFormat.setSchema(schema);

Expand Down
Expand Up @@ -21,9 +21,10 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -59,13 +60,13 @@ public static void writeTsFile(String filePath) {

// add measurements into file schema (all with INT64 data type)
for (int i = 0; i < 2; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
}

for (int i = 2; i < sensorNum; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.DOUBLE, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.DOUBLE, TSEncoding.TS_2DIFF));
}

// add measurements into TSFileWriter
Expand Down
Expand Up @@ -25,8 +25,9 @@
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.thrift.TException;

Expand Down Expand Up @@ -123,9 +124,12 @@ private static void insertInBatch() throws IoTDBSessionException {
private static void insertRowBatch() throws IoTDBSessionException {
// The schema of sensors of one device
Schema schema = new Schema();
schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schema.registerMeasurement(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));
schema.registerTimeseries(new Path("root.sg1.d1", "s1"),
new TimeseriesSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema.registerTimeseries(new Path("root.sg1.d1", "s2"),
new TimeseriesSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schema.registerTimeseries(new Path("root.sg1.d1", "s3"),
new TimeseriesSchema("s3", TSDataType.INT64, TSEncoding.RLE));

RowBatch rowBatch = schema.createRowBatch("root.sg1.d1", 100);

Expand Down
Expand Up @@ -21,18 +21,15 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
import java.util.Map;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
Expand Down Expand Up @@ -88,7 +85,7 @@ public static void main(String[] args) throws IOException {
defaultTimeDecoder, null);
BatchData batchData = reader1.getAllSatisfiedPageData();
while (batchData.hasCurrent()) {
System.out.println(
System.out.println(
"\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
.currentValue());
batchData.next();
Expand All @@ -105,21 +102,16 @@ public static void main(String[] args) throws IOException {
}
}
System.out.println("[Metadata]");
List<TsDeviceMetadataIndex> deviceMetadataIndexList = metaData.getDeviceMap().values().stream()
.sorted((x, y) -> (int) (x.getOffset() - y.getOffset())).collect(Collectors.toList());
for (TsDeviceMetadataIndex index : deviceMetadataIndexList) {
TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata.getChunkGroupMetaDataList();
for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
System.out.println(String
.format("\t[Device]File Offset: %d, Device %s, Number of Chunk Groups %d",
index.getOffset(), chunkGroupMetaData.getDeviceID(),
chunkGroupMetaDataList.size()));

for (ChunkMetaData chunkMetadata : chunkGroupMetaData.getChunkMetaDataList()) {
System.out.println("\t\tMeasurement:" + chunkMetadata.getMeasurementUid());
System.out.println("\t\tFile offset:" + chunkMetadata.getOffsetOfChunkHeader());
}
Map<String, int[]> deviceOffsetsMap = metaData.getDeviceOffsetsMap();
for (Map.Entry<String, int[]> entry: deviceOffsetsMap.entrySet()) {
String deviceId = entry.getKey();
List<ChunkMetaData> chunkMetadataList =
reader.readChunkMetadataInDevice(entry.getValue()[0], entry.getValue()[1]);
System.out.println(String
.format("\t[Device]Device %s, Number of Chunk %d", deviceId, chunkMetadataList.size()));
for (ChunkMetaData chunkMetadata : chunkMetadataList) {
System.out.println("\t\tMeasurement:" + chunkMetadata.getMeasurementUid());
System.out.println("\t\tFile offset:" + chunkMetadata.getOffsetOfChunkHeader());
}
}
reader.close();
Expand Down
Expand Up @@ -22,13 +22,12 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;

import java.io.File;

import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;

import java.io.File;

Expand All @@ -54,8 +53,8 @@ public static void main(String[] args) {

// add measurements into file schema (all with INT64 data type)
for (int i = 0; i < sensorNum; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
}

// add measurements into TSFileWriter
Expand Down
Expand Up @@ -22,11 +22,12 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;

import java.io.File;

Expand All @@ -47,12 +48,16 @@ public static void main(String args[]) {
TsFileWriter tsFileWriter = new TsFileWriter(f);

// add measurements into file schema
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.RLE));
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.RLE));
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.RLE));

for (int i = 0; i < 4; i++) {
// add measurements into file schema
tsFileWriter.addTimeseries(new Path("device_" + i, "sensor_1"),
new TimeseriesSchema("sensor_1", TSDataType.INT64, TSEncoding.RLE));
tsFileWriter.addTimeseries(new Path("device_" + i, "sensor_2"),
new TimeseriesSchema("sensor_2", TSDataType.INT64, TSEncoding.RLE));
tsFileWriter.addTimeseries(new Path("device_" + i, "sensor_3"),
new TimeseriesSchema("sensor_3", TSDataType.INT64, TSEncoding.RLE));
}

// construct TSRecord
for (int i = 0; i < 100; i++) {
Expand Down
Expand Up @@ -28,7 +28,8 @@
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.iotdb.hadoop.fileSystem.HDFSInput;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,7 +38,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -301,43 +304,45 @@ private static List<TSFInputSplit> generateSplits(Path path, TsFileSequenceReade

Arrays.sort(blockLocations, Comparator.comparingLong(BlockLocation::getOffset));

List<ChunkGroupMetaData> chunkGroupMetaDataList = new ArrayList<>();
Map<String, List<TimeseriesMetaData>> deviceTimeseriesMetaDataListMap = new HashMap<>();
int currentBlockIndex = 0;
long splitSize = 0;
List<String> hosts = new ArrayList<>();
for (ChunkGroupMetaData chunkGroupMetaData : fileReader.getSortedChunkGroupMetaDataListByDeviceIds()) {
logger.info("The chunkGroupMetaData information is {}", chunkGroupMetaData);

// middle offset point of the chunkGroup
long middle = (chunkGroupMetaData.getStartOffsetOfChunkGroup() + chunkGroupMetaData.getEndOffsetOfChunkGroup()) / 2;
int blkIndex = getBlockLocationIndex(blockLocations, middle, logger);
if (hosts.size() == 0) {
hosts.addAll(Arrays.asList(blockLocations[blkIndex].getHosts()));
}

if (blkIndex != currentBlockIndex) {
TSFInputSplit tsfInputSplit = makeSplit(path, chunkGroupMetaDataList, splitSize, hosts);
logger.info("The tsfile inputSplit information is {}", tsfInputSplit);
splits.add(tsfInputSplit);

currentBlockIndex = blkIndex;
chunkGroupMetaDataList.clear();
chunkGroupMetaDataList.add(chunkGroupMetaData);
splitSize = getTotalByteSizeOfChunkGroup(chunkGroupMetaData);
hosts.clear();
} else {
chunkGroupMetaDataList.add(chunkGroupMetaData);
splitSize += getTotalByteSizeOfChunkGroup(chunkGroupMetaData);
for (Map.Entry<String, List<TimeseriesMetaData>> entry : fileReader.getSortedTimeseriesMetaDataMap()) {
String deviceId = entry.getKey();
List<TimeseriesMetaData> timeseriesMetaDataList = entry.getValue();
logger.info("");
for (TimeseriesMetaData timeseriesMetaData : timeseriesMetaDataList) {
long middle = (timeseriesMetaData.getOffsetOfChunkMetaDataList()
+ timeseriesMetaData.getDataSizeOfChunkMetaDataList() / 2);
int blkIndex = getBlockLocationIndex(blockLocations, middle, logger);
if (hosts.size() == 0) {
hosts.addAll(Arrays.asList(blockLocations[blkIndex].getHosts()));
}
if (blkIndex != currentBlockIndex) {
TSFInputSplit tsfInputSplit = makeSplit(path, timeseriesMetaDataList, splitSize, hosts);
logger.info("The tsfile inputSplit information is {}", tsfInputSplit);
splits.add(tsfInputSplit);

currentBlockIndex = blkIndex;
timeseriesMetaDataList.clear();
timeseriesMetaDataList.add(timeseriesMetaData);
splitSize = getTotalByteSizeOfChunkMetaDataList(timeseriesMetaData);
hosts.clear();
} else {
timeseriesMetaDataList.add(timeseriesMetaData);
splitSize += getTotalByteSizeOfChunkMetaDataList(timeseriesMetaData);
}
}
TSFInputSplit tsfInputSplit = makeSplit(path, timeseriesMetaDataList, splitSize, hosts);
logger.info("The tsfile inputSplit information is {}", tsfInputSplit);
splits.add(tsfInputSplit);
return splits;
}
TSFInputSplit tsfInputSplit = makeSplit(path, chunkGroupMetaDataList, splitSize, hosts);
logger.info("The tsfile inputSplit information is {}", tsfInputSplit);
splits.add(tsfInputSplit);
return splits;
}

private static long getTotalByteSizeOfChunkGroup(ChunkGroupMetaData chunkGroupMetaData) {
return chunkGroupMetaData.getEndOffsetOfChunkGroup() - chunkGroupMetaData.getStartOffsetOfChunkGroup();
private static long getTotalByteSizeOfChunkMetaDataList(TimeseriesMetaData timeseriesMetaData) {
return timeseriesMetaData.getDataSizeOfChunkMetaDataList();
}


Expand All @@ -359,13 +364,14 @@ private static int getBlockLocationIndex(BlockLocation[] blockLocations, long mi
+ blockLocations[blockLocations.length - 1].getLength());
return -1;
}

private static TSFInputSplit makeSplit(Path path, List<ChunkGroupMetaData> chunkGroupMetaDataList,
long length, List<String> hosts) {
private static TSFInputSplit makeSplit(Path path, List<TimeseriesMetaData> timeseriesMetaDataList,
long length, List<String> hosts) {
return new TSFInputSplit(path, hosts.toArray(new String[0]), length,
chunkGroupMetaDataList.stream()
.map(TSFInputSplit.ChunkGroupInfo::new)
.collect(Collectors.toList())
timeseriesMetaDataList.stream()
.map(TSFInputSplit.ChunkGroupInfo::new)
.collect(Collectors.toList())
);
}

}
Expand Up @@ -21,8 +21,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;

import java.io.DataInput;
import java.io.DataOutput;
Expand Down Expand Up @@ -161,17 +159,6 @@ public ChunkGroupInfo(String deviceId, String[] measurementIds, long startOffset
this.endOffset = endOffset;
}

public ChunkGroupInfo(ChunkGroupMetaData chunkGroupMetaData) {
this.deviceId = chunkGroupMetaData.getDeviceID();
this.measurementIds = chunkGroupMetaData.getChunkMetaDataList().stream()
.map(ChunkMetaData::getMeasurementUid)
.distinct()
.toArray(String[]::new);

this.startOffset = chunkGroupMetaData.getStartOffsetOfChunkGroup();
this.endOffset = chunkGroupMetaData.getEndOffsetOfChunkGroup();
}

public String getDeviceId() {
return deviceId;
}
Expand Down
Expand Up @@ -21,9 +21,10 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,13 +59,13 @@ public static void writeTsFile(String filePath) {
// the number of values to include in the row batch
int sensorNum = 10;

// add measurements into file schema (all with INT64 data type)
// add timeseries into file schema (all with INT64 data type)
for (int i = 0; i < sensorNum; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
}

// add measurements into TSFileWriter
// add timeseries into TSFileWriter
TsFileWriter tsFileWriter = new TsFileWriter(file, schema);

// construct the row batch
Expand Down