Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void start() throws StartupException {
service = IoTDBThreadPoolFactory.newScheduledThreadPool(
2, ThreadName.TIME_COST_STATSTIC.getName());
}
//we have to check again because someone may channge the value.
//we have to check again because someone may change the value.
isEnableStat = IoTDBDescriptor.getInstance().getConfig().isEnablePerformanceStat();
if (isEnableStat) {
consumeFuture = service.schedule(new QueueConsumerThread(), 0, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public StorageGroupProcessor(String systemInfoDir, String storageGroupName)
logger.info("Storage Group system Directory {} doesn't exist, create it",
storageGroupSysDir.getPath());
} else if (!storageGroupSysDir.exists()) {
logger.error("craete Storage Group system Directory {} failed",
logger.error("create Storage Group system Directory {} failed",
storageGroupSysDir.getPath());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,4 @@ public void close() throws IOException {
public TsFileProcessor getUnsealedFileProcessor() {
return processor;
}

public void updateTime(String deviceId, long time) {
startTimeMap.putIfAbsent(deviceId, time);
Long endTime = endTimeMap.get(deviceId);
if (endTime == null || endTime < time) {
endTimeMap.put(deviceId, time);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
Expand Down Expand Up @@ -103,8 +103,8 @@ public void replayLogs() throws ProcessorException {
} finally {
logReader.close();
}
tempStartTimeMap.forEach((k, v) -> currentTsFileResource.updateTime(k, v));
tempEndTimeMap.forEach((k, v) -> currentTsFileResource.updateTime(k, v));
tempStartTimeMap.forEach((k, v) -> currentTsFileResource.updateStartTime(k, v));
tempEndTimeMap.forEach((k, v) -> currentTsFileResource.updateEndTime(k, v));
}

private void replayDelete(DeletePlan deletePlan) throws IOException {
Expand All @@ -123,7 +123,10 @@ private void replayInsert(InsertPlan insertPlan) {
!acceptDuplication) {
return;
}
tempStartTimeMap.putIfAbsent(insertPlan.getDeviceId(), insertPlan.getTime());
Long startTime = tempStartTimeMap.get(insertPlan.getDeviceId());
if (startTime == null || startTime > insertPlan.getTime()) {
tempStartTimeMap.put(insertPlan.getDeviceId(), insertPlan.getTime());
}
Long endTime = tempEndTimeMap.get(insertPlan.getDeviceId());
if (endTime == null || endTime < insertPlan.getTime()) {
tempEndTimeMap.put(insertPlan.getDeviceId(), insertPlan.getTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
Expand All @@ -45,10 +45,11 @@
import org.slf4j.LoggerFactory;

/**
* TsFileRecoverPerformer recovers a SeqTsFile to correct status, redoes the WALs since last
* crash and removes the redone logs.
* TsFileRecoverPerformer recovers a SeqTsFile to correct status, redoes the WALs since last crash
* and removes the redone logs.
*/
public class TsFileRecoverPerformer {

private static final Logger logger = LoggerFactory.getLogger(TsFileRecoverPerformer.class);

private String insertFilePath;
Expand All @@ -72,10 +73,7 @@ public TsFileRecoverPerformer(String logNodePrefix,

/**
* 1. recover the TsFile by RestorableTsFileIOWriter and truncate the file to remaining corrected
* data
* 2. redo the WALs to recover unpersisted data
* 3. flush and close the file
* 4. clean WALs
* data 2. redo the WALs to recover unpersisted data 3. flush and close the file 4. clean WALs
*/
public void recover() throws ProcessorException {
IMemTable recoverMemTable = new PrimitiveMemTable();
Expand Down Expand Up @@ -103,17 +101,21 @@ public void recover() throws ProcessorException {
tsFileResource.deSerialize();
} else {
// .resource file does not exist, read file metadata and recover tsfile resource
try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFileResource.getFile().getAbsolutePath())) {
try (TsFileSequenceReader reader = new TsFileSequenceReader(
tsFileResource.getFile().getAbsolutePath())) {
TsFileMetaData metaData = reader.readFileMetadata();
List<TsDeviceMetadataIndex> deviceMetadataIndexList = new ArrayList<>(
metaData.getDeviceMap().values());
for (TsDeviceMetadataIndex index : deviceMetadataIndexList) {
TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata.getChunkGroupMetaDataList();
List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
.getChunkGroupMetaDataList();
for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getStartTime());
tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
chunkMetaData.getStartTime());
tsFileResource
.updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
}
}
}
Expand All @@ -132,8 +134,10 @@ public void recover() throws ProcessorException {
for (ChunkGroupMetaData chunkGroupMetaData : restorableTsFileIOWriter
.getChunkGroupMetaDatas()) {
for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getStartTime());
tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
chunkMetaData.getStartTime());
tsFileResource
.updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
Expand Down Expand Up @@ -73,18 +73,21 @@ public long currVersion() {

try {
for (int i = 0; i < 5; i++) {
schema.registerMeasurement(new MeasurementSchema("sensor" + i, TSDataType.INT64, TSEncoding.PLAIN));
schema.registerMeasurement(
new MeasurementSchema("sensor" + i, TSDataType.INT64, TSEncoding.PLAIN));
}

LogReplayer replayer = new LogReplayer(logNodePrefix, tsFile.getPath(), modFile,
versionController, tsFileResource, schema, memTable, true);

WriteLogNode node =
MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName());
for (int i = 0; i < 5; i++) {
node.write(new InsertPlan("device0", 100, "sensor0", String.valueOf(0)));
node.write(new InsertPlan("device0", 2, "sensor1", String.valueOf(0)));
for (int i = 1; i < 5; i++) {
node.write(new InsertPlan("device" + i, i, "sensor" + i, String.valueOf(i)));
}
DeletePlan deletePlan = new DeletePlan(3, new Path("device0", "sensor0"));
DeletePlan deletePlan = new DeletePlan(200, new Path("device0", "sensor0"));
node.write(deletePlan);
node.close();

Expand All @@ -107,11 +110,13 @@ public long currVersion() {

Modification[] mods = modFile.getModifications().toArray(new Modification[0]);
assertEquals(1, mods.length);
assertEquals(new Deletion(new Path("device0", "sensor0"), 5, 3), mods[0]);
assertEquals(new Deletion(new Path("device0", "sensor0"), 5, 200), mods[0]);

for (int i = 0; i < 5; i++) {
assertEquals(i, (long)tsFileResource.getStartTimeMap().get("device" + i));
assertEquals(i, (long)tsFileResource.getEndTimeMap().get("device" + i));
assertEquals(2, (long) tsFileResource.getStartTimeMap().get("device0"));
assertEquals(100, (long) tsFileResource.getEndTimeMap().get("device0"));
for (int i = 1; i < 5; i++) {
assertEquals(i, (long) tsFileResource.getStartTimeMap().get("device" + i));
assertEquals(i, (long) tsFileResource.getEndTimeMap().get("device" + i));
}
} finally {
modFile.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@
import org.apache.iotdb.tsfile.write.schema.FileSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SeqTsFileRecoverTest {

private File tsF;
private TsFileWriter writer;
private WriteLogNode node;
Expand All @@ -60,6 +62,7 @@ public class SeqTsFileRecoverTest {
private TsFileResource resource;
private VersionController versionController = new VersionController() {
private int i;

@Override
public long nextVersion() {
return ++i;
Expand All @@ -83,9 +86,16 @@ public void setup() throws IOException, WriteProcessException {
}
writer = new TsFileWriter(tsF, schema);

TSRecord tsRecord = new TSRecord(100, "device99");
tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor4", String.valueOf(0)));
writer.write(tsRecord);
tsRecord = new TSRecord(2, "device99");
tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(0)));
writer.write(tsRecord);

for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
TSRecord tsRecord = new TSRecord(i, "device" + j);
tsRecord = new TSRecord(i, "device" + j);
for (int k = 0; k < 10; k++) {
tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor" + k,
String.valueOf(k)));
Expand Down Expand Up @@ -126,6 +136,13 @@ public void test() throws ProcessorException, IOException {
versionController, resource, true);
performer.recover();

assertEquals(2, (long) resource.getStartTimeMap().get("device99"));
assertEquals(100, (long) resource.getEndTimeMap().get("device99"));
for (int i = 0; i < 10; i++) {
assertEquals(0, (long) resource.getStartTimeMap().get("device" + i));
assertEquals(19, (long) resource.getEndTimeMap().get("device" + i));
}

ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new TsFileSequenceReader(tsF.getPath()));
List<Path> pathList = new ArrayList<>();
for (int j = 0; j < 10; j++) {
Expand All @@ -144,6 +161,19 @@ public void test() throws ProcessorException, IOException {
assertEquals(j % 10, fields.get(j).getLongV());
}
}

pathList = new ArrayList<>();
pathList.add(new Path("device99", "sensor1"));
pathList.add(new Path("device99", "sensor4"));
queryExpression = QueryExpression.create(pathList, null);
dataSet = readOnlyTsFile.query(queryExpression);
Assert.assertTrue(dataSet.hasNext());
RowRecord record = dataSet.next();
Assert.assertEquals("2\t0\tnull", record.toString());
Assert.assertTrue(dataSet.hasNext());
record = dataSet.next();
Assert.assertEquals("100\tnull\t0", record.toString());

readOnlyTsFile.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.junit.Test;

public class UnseqTsFileRecoverTest {

private File tsF;
private TsFileWriter writer;
private WriteLogNode node;
Expand All @@ -64,6 +65,7 @@ public class UnseqTsFileRecoverTest {
private TsFileResource resource;
private VersionController versionController = new VersionController() {
private int i;

@Override
public long nextVersion() {
return ++i;
Expand All @@ -87,16 +89,24 @@ public void setup() throws IOException, WriteProcessException {
}
writer = new TsFileWriter(tsF, schema);

TSRecord tsRecord = new TSRecord(100, "device99");
tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor4", String.valueOf(0)));
writer.write(tsRecord);
tsRecord = new TSRecord(2, "device99");
tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(0)));
writer.write(tsRecord);

for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
TSRecord tsRecord = new TSRecord(i, "device" + j);
tsRecord = new TSRecord(i, "device" + j);
for (int k = 0; k < 10; k++) {
tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor" + k,
String.valueOf(k)));
}
writer.write(tsRecord);
}
}

writer.flushForTest();
writer.getIOWriter().close();

Expand All @@ -114,6 +124,12 @@ public void setup() throws IOException, WriteProcessException {
}
node.notifyStartFlush();
}
InsertPlan insertPlan = new InsertPlan("device99", 1, "sensor4", "4");
node.write(insertPlan);
insertPlan = new InsertPlan("device99", 300, "sensor2", "2");
node.write(insertPlan);
node.close();

resource = new TsFileResource(tsF);
}

Expand All @@ -129,6 +145,13 @@ public void test() throws ProcessorException, IOException {
versionController, resource, true);
performer.recover();

assertEquals(1, (long) resource.getStartTimeMap().get("device99"));
assertEquals(300, (long) resource.getEndTimeMap().get("device99"));
for (int i = 0; i < 10; i++) {
assertEquals(0, (long) resource.getStartTimeMap().get("device" + i));
assertEquals(9, (long) resource.getEndTimeMap().get("device" + i));
}

TsFileSequenceReader fileReader = new TsFileSequenceReader(tsF.getPath(), true);
MetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(fileReader);
ChunkLoader chunkLoader = new ChunkLoaderImpl(fileReader);
Expand All @@ -150,7 +173,6 @@ public void test() throws ProcessorException, IOException {
assertEquals(i, timeValuePair.getTimestamp());
assertEquals(11, timeValuePair.getValue().getLong());
unSeqMergeReader.next();

}
unSeqMergeReader.close();
fileReader.close();
Expand Down