Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,7 @@ private static void insertTabletWithNullValues()
Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100);

// Method 1 to add tablet data
tablet.bitMaps = new BitMap[schemaList.size()];
for (int s = 0; s < 3; s++) {
tablet.bitMaps[s] = new BitMap(tablet.getMaxRowNumber());
}
tablet.initBitMaps();

long timestamp = System.currentTimeMillis();
for (long row = 0; row < 100; row++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
private static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class);
Expand Down Expand Up @@ -150,7 +155,7 @@ public int write(Tablet tablet) throws WriteProcessException, IOException {
// check isNull by bitMap in tablet
if (tablet.bitMaps != null
&& tablet.bitMaps[columnIndex] != null
&& !tablet.bitMaps[columnIndex].isMarked(row)) {
&& tablet.bitMaps[columnIndex].isMarked(row)) {
isNull = true;
}
ValueChunkWriter valueChunkWriter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ public int write(Tablet tablet) throws WriteProcessException {
long time = tablet.timestamps[row];
boolean hasOneColumnWritten = false;
for (int column = 0; column < timeseries.size(); column++) {
// check isNull in tablet
if (tablet.bitMaps != null
&& tablet.bitMaps[column] != null
&& tablet.bitMaps[column].isMarked(row)) {
continue;
}
String measurementId = timeseries.get(column).getMeasurementId();
checkIsHistoryData(measurementId, time);
hasOneColumnWritten = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,19 @@ public long estimateMaxSeriesMemSize() {

public long getCurrentChunkSize() {
/**
* It may happen if pageBuffer stores empty bits and subsequent write operations are all out of
* order, then count of statistics in this chunk will be 0 and this chunk will not be flushed.
* It may happen if subsequent write operations are all out of order, then count of statistics
* in this chunk will be 0 and this chunk will not be flushed.
*/
if (pageBuffer.size() == 0 || statistics.getCount() == 0) {
if (pageBuffer.size() == 0) {
return 0;
}

// Empty chunk, it may happen if pageBuffer stores empty bits and only chunk header will be
// flushed.
if (statistics.getCount() == 0) {
return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size());
}

// return the serialized size of the chunk header + all pages
return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size())
+ (long) pageBuffer.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ public void setSchemas(List<MeasurementSchema> schemas) {
this.schemas = schemas;
}

public void initBitMaps() {
this.bitMaps = new BitMap[schemas.size()];
for (int column = 0; column < schemas.size(); column++) {
this.bitMaps[column] = new BitMap(getMaxRowNumber());
}
}

public void addTimestamp(int rowIndex, long timestamp) {
timestamps[rowIndex] = timestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

import org.junit.After;
Expand All @@ -42,6 +44,9 @@ public class TsFileWriteApiTest {
private final String deviceId = "root.sg.d1";
private final List<MeasurementSchema> alignedMeasurementSchemas = new ArrayList<>();
private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
private int oldChunkGroupSize = TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte();
private int oldMaxNumOfPointsInPage =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();

@Before
public void setUp() {
Expand All @@ -53,6 +58,8 @@ public void setUp() {
@After
public void end() {
if (f.exists()) f.delete();
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(oldMaxNumOfPointsInPage);
TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(oldChunkGroupSize);
}

private void setEnv(int chunkGroupSize, int pageSize) {
Expand Down Expand Up @@ -324,4 +331,96 @@ public void writeOutOfOrderData() throws IOException, WriteProcessException {
}
}
}

@Test
public void writeNonAlignedWithTabletWithNullValue() {
setEnv(100, 30);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s2", TSDataType.TEXT, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));

// register nonAligned timeseries
tsFileWriter.registerTimeseries(new Path(deviceId), measurementSchemas);

Tablet tablet = new Tablet(deviceId, measurementSchemas);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
tablet.initBitMaps();
long sensorNum = measurementSchemas.size();
long startTime = 0;
for (long r = 0; r < 10000; r++) {
int row = tablet.rowSize++;
timestamps[row] = startTime++;
for (int i = 0; i < sensorNum; i++) {
if (i == 1 && r > 1000) {
tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber());
continue;
}
Binary[] textSensor = (Binary[]) values[i];
textSensor[row] = new Binary("testString.........");
}
// write
if (tablet.rowSize == tablet.getMaxRowNumber()) {
tsFileWriter.write(tablet);
tablet.reset();
}
}
// write
if (tablet.rowSize != 0) {
tsFileWriter.write(tablet);
tablet.reset();
}

} catch (Throwable e) {
e.printStackTrace();
Assert.fail("Meet errors in test: " + e.getMessage());
}
}

@Test
public void writeAlignedWithTabletWithNullValue() {
setEnv(100, 30);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s2", TSDataType.TEXT, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));

// register aligned timeseries
tsFileWriter.registerAlignedTimeseries(new Path(deviceId), measurementSchemas);

Tablet tablet = new Tablet(deviceId, measurementSchemas);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
tablet.initBitMaps();
long sensorNum = measurementSchemas.size();
long startTime = 0;
for (long r = 0; r < 10000; r++) {
int row = tablet.rowSize++;
timestamps[row] = startTime++;
for (int i = 0; i < sensorNum; i++) {
if (i == 1 && r > 1000) {
tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber());
continue;
}
Binary[] textSensor = (Binary[]) values[i];
textSensor[row] = new Binary("testString.........");
}
// write
if (tablet.rowSize == tablet.getMaxRowNumber()) {
tsFileWriter.writeAligned(tablet);
tablet.reset();
}
}
// write
if (tablet.rowSize != 0) {
tsFileWriter.writeAligned(tablet);
tablet.reset();
}

} catch (Throwable e) {
e.printStackTrace();
Assert.fail("Meet errors in test: " + e.getMessage());
}
}
}