Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/Documentation/UserGuide/7-TsFile/2-Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
* It uses the interface:
* public void addMeasurement(MeasurementSchema MeasurementSchema) throws WriteProcessException
*/
public class TsFileWrite {
public class TsFileWriteWithTSRecord {

public static void main(String args[]) {
try {
Expand Down Expand Up @@ -295,7 +295,7 @@ public class TsFileWriteWithRowBatch {
RowBatch rowBatch = fileSchema.createRowBatch("device_1");

long[] timestamps = rowBatch.timestamps;
Object[] sensors = rowBatch.sensors;
Object[] values = rowBatch.values;

long timestamp = 1;
long value = 1000000L;
Expand All @@ -304,11 +304,11 @@ public class TsFileWriteWithRowBatch {
int row = rowBatch.batchSize++;
timestamps[row] = timestamp++;
for (int i = 0; i < sensorNum; i++) {
long[] sensor = (long[]) sensors[i];
long[] sensor = (long[]) values[i];
sensor[row] = value;
}
// write RowBatch to TsFile
if (rowBatch.batchSize == rowBatch.getBatchMaxSize()) {
if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
tsFileWriter.write(rowBatch);
rowBatch.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void main(String[] args) {
RowBatch rowBatch = fileSchema.createRowBatch("device_1");

long[] timestamps = rowBatch.timestamps;
Object[] sensors = rowBatch.sensors;
Object[] values = rowBatch.values;

long timestamp = 1;
long value = 1000000L;
Expand All @@ -68,11 +68,11 @@ public static void main(String[] args) {
int row = rowBatch.batchSize++;
timestamps[row] = timestamp++;
for (int i = 0; i < sensorNum; i++) {
long[] sensor = (long[]) sensors[i];
long[] sensor = (long[]) values[i];
sensor[row] = value;
}
// write RowBatch to TsFile
if (rowBatch.batchSize == rowBatch.getBatchMaxSize()) {
if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
tsFileWriter.write(rowBatch);
rowBatch.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,25 @@ public void write(RowBatch rowBatch) throws WriteProcessException, IOException {

private void writeByDataType(
RowBatch rowBatch, String measurementId, TSDataType dataType, int index) throws IOException {
int batchSize = rowBatch.batchSize;
switch (dataType) {
case INT32:
chunkWriters.get(measurementId).write(rowBatch.timestamps, (int[]) rowBatch.values[index]);
chunkWriters.get(measurementId).write(rowBatch.timestamps, (int[]) rowBatch.values[index], batchSize);
break;
case INT64:
chunkWriters.get(measurementId).write(rowBatch.timestamps, (long[]) rowBatch.values[index]);
chunkWriters.get(measurementId).write(rowBatch.timestamps, (long[]) rowBatch.values[index], batchSize);
break;
case FLOAT:
chunkWriters.get(measurementId).write(rowBatch.timestamps, (float[]) rowBatch.values[index]);
chunkWriters.get(measurementId).write(rowBatch.timestamps, (float[]) rowBatch.values[index], batchSize);
break;
case DOUBLE:
chunkWriters.get(measurementId).write(rowBatch.timestamps, (double[]) rowBatch.values[index]);
chunkWriters.get(measurementId).write(rowBatch.timestamps, (double[]) rowBatch.values[index], batchSize);
break;
case BOOLEAN:
chunkWriters.get(measurementId).write(rowBatch.timestamps, (boolean[]) rowBatch.values[index]);
chunkWriters.get(measurementId).write(rowBatch.timestamps, (boolean[]) rowBatch.values[index], batchSize);
break;
case TEXT:
chunkWriters.get(measurementId).write(rowBatch.timestamps, (Binary[]) rowBatch.values[index]);
chunkWriters.get(measurementId).write(rowBatch.timestamps, (Binary[]) rowBatch.values[index], batchSize);
break;
default:
throw new UnSupportedDataTypeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,85 +200,85 @@ public void write(long time, Binary value) {
}

@Override
public void write(long[] timestamps, int[] values) {
public void write(long[] timestamps, int[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}

@Override
public void write(long[] timestamps, long[] values) {
public void write(long[] timestamps, long[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}

@Override
public void write(long[] timestamps, boolean[] values) {
public void write(long[] timestamps, boolean[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}

@Override
public void write(long[] timestamps, float[] values) {
public void write(long[] timestamps, float[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}

@Override
public void write(long[] timestamps, double[] values) {
public void write(long[] timestamps, double[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}

@Override
public void write(long[] timestamps, BigDecimal[] values) {
public void write(long[] timestamps, BigDecimal[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}

@Override
public void write(long[] timestamps, Binary[] values) {
public void write(long[] timestamps, Binary[] values, int batchSize) {
this.time = timestamps[timestamps.length - 1];
valueCountInOnePage += timestamps.length;
if (minTimestamp == Long.MIN_VALUE) {
minTimestamp = timestamps[0];
}
dataPageWriter.write(timestamps, values);
dataPageWriter.write(timestamps, values, batchSize);
pageStatistics.updateStats(values);
checkPageSizeAndMayOpenANewPage();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,37 +68,37 @@ public interface IChunkWriter {
/**
* write time series
*/
void write(long[] timestamps, int[] values);
void write(long[] timestamps, int[] values, int batchSize);

/**
* write time series
*/
void write(long[] timestamps, long[] values);
void write(long[] timestamps, long[] values, int batchSize);

/**
* write time series
*/
void write(long[] timestamps, boolean[] values);
void write(long[] timestamps, boolean[] values, int batchSize);

/**
* write time series
*/
void write(long[] timestamps, float[] values);
void write(long[] timestamps, float[] values, int batchSize);

/**
* write time series
*/
void write(long[] timestamps, double[] values);
void write(long[] timestamps, double[] values, int batchSize);

/**
* write time series
*/
void write(long[] timestamps, BigDecimal[] values);
void write(long[] timestamps, BigDecimal[] values, int batchSize);

/**
* write time series
*/
void write(long[] timestamps, Binary[] values);
void write(long[] timestamps, Binary[] values, int batchSize);

/**
* flush data to TsFileIOWriter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ public void write(long time, Binary value) {
/**
* write time series into encoder
*/
public void write(long[] timestamps, boolean[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, boolean[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
Expand All @@ -134,8 +134,8 @@ public void write(long[] timestamps, boolean[] values) {
/**
* write time series into encoder
*/
public void write(long[] timestamps, int[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, int[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
Expand All @@ -144,8 +144,8 @@ public void write(long[] timestamps, int[] values) {
/**
* write time series into encoder
*/
public void write(long[] timestamps, long[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, long[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
Expand All @@ -154,8 +154,8 @@ public void write(long[] timestamps, long[] values) {
/**
* write time series into encoder
*/
public void write(long[] timestamps, float[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, float[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
Expand All @@ -164,8 +164,8 @@ public void write(long[] timestamps, float[] values) {
/**
* write time series into encoder
*/
public void write(long[] timestamps, double[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, double[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
Expand All @@ -174,8 +174,8 @@ public void write(long[] timestamps, double[] values) {
/**
* write time series into encoder
*/
public void write(long[] timestamps, BigDecimal[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, BigDecimal[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
Expand All @@ -184,8 +184,8 @@ public void write(long[] timestamps, BigDecimal[] values) {
/**
* write time series into encoder
*/
public void write(long[] timestamps, Binary[] values) {
for (int i = 0; i < timestamps.length; i++) {
public void write(long[] timestamps, Binary[] values, int batchSize) {
for (int i = 0; i < batchSize; i++) {
timeEncoder.encode(timestamps[i], timeOut);
valueEncoder.encode(values[i], valueOut);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private void writeDataByRowBatch()
FileSchema fileSchema = new FileSchema();
fileSchema.registerMeasurement(
new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
int rowNum = 1024 * 1024;
int rowNum = 1024 * 1024 * 13 + 1023;
int sensorNum = 1;
TsFileWriter tsFileWriter = new TsFileWriter(f, fileSchema);
RowBatch rowBatch = fileSchema.createRowBatch("device_1");
Expand Down