Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor TsFile #736

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -59,12 +59,13 @@ public static void main(String[] args)

// add measurements into file schema (all with INT64 data type)
for (int i = 0; i < 2; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new org.apache.iotdb.tsfile.read.common.Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
}

for (int i = 2; i < sensorNum; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.DOUBLE, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new org.apache.iotdb.tsfile.read.common.Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.DOUBLE, TSEncoding.TS_2DIFF));
}
TSFOutputFormat.setSchema(schema);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -59,13 +60,13 @@ public static void writeTsFile(String filePath) {

// add measurements into file schema (all with INT64 data type)
for (int i = 0; i < 2; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
}

for (int i = 2; i < sensorNum; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.DOUBLE, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.DOUBLE, TSEncoding.TS_2DIFF));
}

// add measurements into TSFileWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.thrift.TException;

Expand Down Expand Up @@ -123,9 +124,12 @@ private static void insertInBatch() throws IoTDBSessionException {
private static void insertRowBatch() throws IoTDBSessionException {
// The schema of sensors of one device
Schema schema = new Schema();
schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schema.registerMeasurement(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));
schema.registerTimeseries(new Path("root.sg1.d1", "s1"),
new TimeseriesSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema.registerTimeseries(new Path("root.sg1.d1", "s2"),
new TimeseriesSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schema.registerTimeseries(new Path("root.sg1.d1", "s3"),
new TimeseriesSchema("s3", TSDataType.INT64, TSEncoding.RLE));

RowBatch rowBatch = schema.createRowBatch("root.sg1.d1", 100);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;

import java.io.File;

import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;

import java.io.File;

Expand All @@ -54,8 +53,8 @@ public static void main(String[] args) {

// add measurements into file schema (all with INT64 data type)
for (int i = 0; i < sensorNum; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
}

// add measurements into TSFileWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;

import java.io.File;

Expand All @@ -47,12 +48,16 @@ public static void main(String args[]) {
TsFileWriter tsFileWriter = new TsFileWriter(f);

// add measurements into file schema
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.RLE));
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.RLE));
tsFileWriter
.addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.RLE));

for (int i = 0; i < 100; i++) {
// add measurements into file schema
tsFileWriter.addTimeseries(new Path("device_" + (i % 4), "sensor_1"),
new TimeseriesSchema("sensor_1", TSDataType.INT64, TSEncoding.RLE));
tsFileWriter.addTimeseries(new Path("device_" + (i % 4), "sensor_2"),
new TimeseriesSchema("sensor_2", TSDataType.INT64, TSEncoding.RLE));
tsFileWriter.addTimeseries(new Path("device_" + (i % 4), "sensor_3"),
new TimeseriesSchema("sensor_3", TSDataType.INT64, TSEncoding.RLE));
}

// construct TSRecord
for (int i = 0; i < 100; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;

import java.io.DataInput;
import java.io.DataOutput;
Expand Down Expand Up @@ -161,17 +159,6 @@ public ChunkGroupInfo(String deviceId, String[] measurementIds, long startOffset
this.endOffset = endOffset;
}

public ChunkGroupInfo(ChunkGroupMetaData chunkGroupMetaData) {
this.deviceId = chunkGroupMetaData.getDeviceID();
this.measurementIds = chunkGroupMetaData.getChunkMetaDataList().stream()
.map(ChunkMetaData::getMeasurementUid)
.distinct()
.toArray(String[]::new);

this.startOffset = chunkGroupMetaData.getStartOffsetOfChunkGroup();
this.endOffset = chunkGroupMetaData.getEndOffsetOfChunkGroup();
}

public String getDeviceId() {
return deviceId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,13 +59,13 @@ public static void writeTsFile(String filePath) {
// the number of values to include in the row batch
int sensorNum = 10;

// add measurements into file schema (all with INT64 data type)
// add timeseries into file schema (all with INT64 data type)
for (int i = 0; i < sensorNum; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
}

// add measurements into TSFileWriter
// add timeseries into TSFileWriter
TsFileWriter tsFileWriter = new TsFileWriter(file, schema);

// construct the row batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -60,8 +61,8 @@ public static void writeTsFile(String filePath) {

// add measurements into file schema (all with INT64 data type)
for (int i = 0; i < sensorNum; i++) {
schema.registerMeasurement(
new MeasurementSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
schema.registerTimeseries(new Path("device_1", "sensor_" + (i + 1)),
new TimeseriesSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
}

// add measurements into TSFileWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -79,7 +80,7 @@ public void syncFlushMemTable() throws ExecutionException, InterruptedException
for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
long startTime = System.currentTimeMillis();
IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId);
MeasurementSchema desc = schema.getMeasurementSchema(measurementId);
TimeseriesSchema desc = schema.getSeriesSchema(new Path(deviceId, measurementId));
TVList tvList = series.getSortedTVList();
sortTime += System.currentTimeMillis() - startTime;
encodingTaskQueue.add(new Pair<>(tvList, desc));
Expand Down Expand Up @@ -171,7 +172,7 @@ public void run() {
ioTaskQueue.add(task);
} else {
long starTime = System.currentTimeMillis();
Pair<TVList, MeasurementSchema> encodingMessage = (Pair<TVList, MeasurementSchema>) task;
Pair<TVList, TimeseriesSchema> encodingMessage = (Pair<TVList, TimeseriesSchema>) task;
IChunkWriter seriesWriter = new ChunkWriterImpl(encodingMessage.right);
writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType());
ioTaskQueue.add(seriesWriter);
Expand Down
14 changes: 7 additions & 7 deletions session/src/main/java/org/apache/iotdb/session/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
Expand Down Expand Up @@ -186,9 +186,9 @@ public TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch)
TSBatchInsertionReq request = new TSBatchInsertionReq();
request.setSessionId(sessionId);
request.deviceId = rowBatch.deviceId;
for (MeasurementSchema measurementSchema : rowBatch.measurements) {
request.addToMeasurements(measurementSchema.getMeasurementId());
request.addToTypes(measurementSchema.getType().ordinal());
for (TimeseriesSchema timeseriesSchema : rowBatch.timeseries) {
request.addToMeasurements(timeseriesSchema.getMeasurementId());
request.addToTypes(timeseriesSchema.getType().ordinal());
}
request.setTimestamps(SessionUtils.getTimeBuffer(rowBatch));
request.setValues(SessionUtils.getValueBuffer(rowBatch));
Expand Down Expand Up @@ -270,9 +270,9 @@ public TSExecuteBatchStatementResp testInsertBatch(RowBatch rowBatch)
TSBatchInsertionReq request = new TSBatchInsertionReq();
request.setSessionId(sessionId);
request.deviceId = rowBatch.deviceId;
for (MeasurementSchema measurementSchema : rowBatch.measurements) {
request.addToMeasurements(measurementSchema.getMeasurementId());
request.addToTypes(measurementSchema.getType().ordinal());
for (TimeseriesSchema timeseriesSchema : rowBatch.timeseries) {
request.addToMeasurements(timeseriesSchema.getMeasurementId());
request.addToTypes(timeseriesSchema.getType().ordinal());
}
request.setTimestamps(SessionUtils.getTimeBuffer(rowBatch));
request.setValues(SessionUtils.getValueBuffer(rowBatch));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public static ByteBuffer getTimeBuffer(RowBatch rowBatch) {

public static ByteBuffer getValueBuffer(RowBatch rowBatch) {
ByteBuffer valueBuffer = ByteBuffer.allocate(rowBatch.getValueBytesSize());
for (int i = 0; i < rowBatch.measurements.size(); i++) {
TSDataType dataType = rowBatch.measurements.get(i).getType();
for (int i = 0; i < rowBatch.timeseries.size(); i++) {
TSDataType dataType = rowBatch.timeseries.get(i).getType();
switch (dataType) {
case INT32:
int[] intValues = (int[]) rowBatch.values[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.thrift.TException;
import org.junit.After;
Expand Down Expand Up @@ -67,19 +68,19 @@ public void testTestMethod() throws IoTDBSessionException {
session.open();

session.setStorageGroup("root.sg1");
String deviceId = "root.sg1.d1";

// test insert batch
Schema schema = new Schema();
schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schema.registerMeasurement(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));
schema.registerTimeseries(new Path(deviceId, "s1"), new TimeseriesSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema.registerTimeseries(new Path(deviceId, "s2"), new TimeseriesSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schema.registerTimeseries(new Path(deviceId, "s3"), new TimeseriesSchema("s3", TSDataType.INT64, TSEncoding.RLE));

RowBatch rowBatch = schema.createRowBatch("root.sg1.d1", 100);

session.testInsertBatch(rowBatch);

// test insert row
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
Expand Down Expand Up @@ -249,9 +250,9 @@ private void insert() throws IoTDBSessionException {

private void insertRowBatchTest1(String deviceId) throws IoTDBSessionException {
Schema schema = new Schema();
schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schema.registerMeasurement(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));
schema.registerTimeseries(new Path(deviceId, "s1"), new TimeseriesSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema.registerTimeseries(new Path(deviceId, "s2"), new TimeseriesSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schema.registerTimeseries(new Path(deviceId, "s3"), new TimeseriesSchema("s3", TSDataType.INT64, TSEncoding.RLE));

RowBatch rowBatch = schema.createRowBatch(deviceId, 100);

Expand Down
Loading