Skip to content

Commit

Permalink
Merge 953fe1c into 089da8a
Browse files Browse the repository at this point in the history
  • Loading branch information
jt2594838 committed Jan 2, 2020
2 parents 089da8a + 953fe1c commit 69192d9
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 45 deletions.
Expand Up @@ -23,9 +23,11 @@

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
Expand Down Expand Up @@ -150,6 +152,7 @@ private void moveMergedToOld(TsFileResource seqFile) throws IOException {
}
oldFileWriter.endFile(new Schema(newFileWriter.getKnownSchema()));

updateHistoricalVersions(seqFile);
seqFile.serialize();
mergeLogger.logFileMergeEnd();
logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);
Expand All @@ -167,6 +170,20 @@ private void moveMergedToOld(TsFileResource seqFile) throws IOException {
}
}

private void updateHistoricalVersions(TsFileResource seqFile) {
// as the new file contains data of other files, track their versions in the new file
// so that we will be able to compare data across different IoTDBs that share the same file
// generation policy
// however, since the data of unseq files are mixed together, we won't be able to know
// which files are exactly contained in the new file, so we have to record all unseq files
// in the new file
Set<Long> newHistoricalVersions = new HashSet<>(seqFile.getHistoricalVersions());
for (TsFileResource unseqFiles : resource.getUnseqFiles()) {
newHistoricalVersions.addAll(unseqFiles.getHistoricalVersions());
}
seqFile.setHistoricalVersions(newHistoricalVersions);
}

private void writeMergedChunkGroup(ChunkGroupMetaData chunkGroupMetaData,
TsFileSequenceReader reader, TsFileIOWriter fileWriter)
throws IOException {
Expand Down Expand Up @@ -213,6 +230,7 @@ private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {

fileWriter.endFile(new Schema(fileWriter.getKnownSchema()));

updateHistoricalVersions(seqFile);
seqFile.serialize();
mergeLogger.logFileMergeEnd();
logger.debug("{} moved unmerged chunks of {} to the new file", taskName, seqFile);
Expand Down
Expand Up @@ -185,6 +185,10 @@ public class StorageGroupProcessor {

private FSFactory fsFactory = FSFactoryProducer.getFSFactory();

// allDirectFileVersions records the versions of the direct TsFiles (generated by flush), not
// including the files generated by merge
private Set<Long> allDirectFileVersions = new HashSet<>();

public StorageGroupProcessor(String systemInfoDir, String storageGroupName)
throws StorageGroupProcessorException {
this.storageGroupName = storageGroupName;
Expand Down Expand Up @@ -223,6 +227,13 @@ private void recover() throws StorageGroupProcessorException {
recoverSeqFiles(seqTsFiles);
recoverUnseqFiles(unseqTsFiles);

for (TsFileResource resource : seqTsFiles) {
allDirectFileVersions.addAll(resource.getHistoricalVersions());
}
for (TsFileResource resource : unseqTsFiles) {
allDirectFileVersions.addAll(resource.getHistoricalVersions());
}

String taskName = storageGroupName + "-" + System.currentTimeMillis();
File mergingMods = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir,
MERGING_MODIFICATION_FILE_NAME);
Expand Down Expand Up @@ -526,6 +537,7 @@ private TsFileProcessor createTsFileProcessor(boolean sequence)
String filePath = baseDir + File.separator + storageGroupName + File.separator +
System.currentTimeMillis() + IoTDBConstant.TSFILE_NAME_SEPARATOR + versionController
.nextVersion() + IoTDBConstant.TSFILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX;
allDirectFileVersions.add(versionController.currVersion());

if (sequence) {
return new TsFileProcessor(storageGroupName, fsFactory.getFile(filePath),
Expand Down Expand Up @@ -1257,6 +1269,7 @@ public void loadNewTsFile(TsFileResource newTsFileResource)

// update latest time map
updateLatestTimeMap(newTsFileResource);
allDirectFileVersions.addAll(newTsFileResource.getHistoricalVersions());
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
Expand Down
Expand Up @@ -48,7 +48,6 @@
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
Expand Down Expand Up @@ -131,6 +130,9 @@ public class TsFileProcessor {
this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
this.sequence = sequence;
logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath());

// a file generated by flush has only one historical version, which is itself
this.tsFileResource.setHistoricalVersions(Collections.singleton(versionController.currVersion()));
}

/**
Expand Down
Expand Up @@ -22,14 +22,18 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
Expand Down Expand Up @@ -66,6 +70,12 @@ public class TsFileResource {
private volatile boolean deleted = false;
private volatile boolean isMerging = false;

// historicalVersions are used to track the merge history of a TsFile. For a TsFile generated
// by flush, this field only contains its own version number. For a TsFile generated by merge,
// its historicalVersions are the union of all TsFiles' historicalVersions that joined this merge.
// This field helps us compare the files that are generated by different IoTDBs that share the
// same file generation policy but have their own merge policies.
private Set<Long> historicalVersions;

/**
* Chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query
Expand Down Expand Up @@ -96,15 +106,6 @@ public TsFileResource(File file, TsFileProcessor processor) {
this.processor = processor;
}

public TsFileResource(File file,
Map<String, Long> startTimeMap,
Map<String, Long> endTimeMap) {
this.file = file;
this.startTimeMap = startTimeMap;
this.endTimeMap = endTimeMap;
this.closed = true;
}

/**
* unsealed TsFile
*/
Expand Down Expand Up @@ -133,6 +134,13 @@ public void serialize() throws IOException {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}

if (historicalVersions != null) {
ReadWriteIOUtils.write(this.historicalVersions.size(), outputStream);
for (Long historicalVersion : historicalVersions) {
ReadWriteIOUtils.write(historicalVersion, outputStream);
}
}
}
File src = fsFactory.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
File dest = fsFactory.getFile(file + RESOURCE_SUFFIX);
Expand All @@ -159,6 +167,18 @@ public void deSerialize() throws IOException {
}
this.startTimeMap = startTimes;
this.endTimeMap = endTimes;

if (inputStream.available() > 0) {
int versionSize = ReadWriteIOUtils.readInt(inputStream);
historicalVersions = new HashSet<>();
for (int i = 0; i < versionSize; i++) {
historicalVersions.add(ReadWriteIOUtils.readLong(inputStream));
}
} else {
// use the version in file name as the historical version for files of old versions
long version = Long.parseLong(file.getName().split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[1]);
historicalVersions = Collections.singleton(version);
}
}
}

Expand All @@ -180,7 +200,7 @@ public boolean fileExists() {
return fsFactory.getFile(file + RESOURCE_SUFFIX).exists();
}

public void forceUpdateEndTime(String device, long time) {
void forceUpdateEndTime(String device, long time) {
endTimeMap.put(device, time);
}

Expand All @@ -203,7 +223,7 @@ public void setFile(File file) {
this.file = file;
}

public boolean containsDevice(String deviceId) {
boolean containsDevice(String deviceId) {
return startTimeMap.containsKey(deviceId);
}

Expand Down Expand Up @@ -237,15 +257,15 @@ public void close() throws IOException {
chunkMetaDataList = null;
}

public TsFileProcessor getUnsealedFileProcessor() {
TsFileProcessor getUnsealedFileProcessor() {
return processor;
}

public ReentrantReadWriteLock getWriteQueryLock() {
return writeQueryLock;
}

public void doUpgrade() {
void doUpgrade() {
if (UpgradeUtils.isNeedUpgrade(this)) {
UpgradeSevice.getINSTANCE().submitUpgradeTask(new UpgradeTask(this));
}
Expand All @@ -262,7 +282,7 @@ public void remove() {
fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX).delete();
}

public void moveTo(File targetDir) throws IOException {
void moveTo(File targetDir) throws IOException {
FileUtils.moveFile(file, new File(targetDir, file.getName()));
FileUtils.moveFile(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX),
new File(targetDir, file.getName() + RESOURCE_SUFFIX));
Expand Down Expand Up @@ -303,7 +323,7 @@ public void setDeleted(boolean deleted) {
this.deleted = deleted;
}

public boolean isMerging() {
boolean isMerging() {
return isMerging;
}

Expand All @@ -327,4 +347,12 @@ public boolean stillLives(long timeLowerBound) {
}
return false;
}

public Set<Long> getHistoricalVersions() {
return historicalVersions;
}

public void setHistoricalVersions(Set<Long> historicalVersions) {
this.historicalVersions = historicalVersions;
}
}
Expand Up @@ -56,8 +56,14 @@ public static void printResource(String filename) throws IOException {
System.err.println(String.format("analyzing %s ...", filename));
resource.deSerialize();

System.out.println("historicalVersions: " + resource.getHistoricalVersions());

for (String device : resource.getStartTimeMap().keySet()) {
System.out.println(String.format("device %s, start time %d (%s), end time %d (%s)", device,
System.out.println(String.format(
"device %s, "
+ "start time %d (%s), "
+ "end time %d (%s)",
device,
resource.getStartTimeMap().get(device),
DatetimeUtils.convertMillsecondToZonedDateTime(resource.getStartTimeMap().get(device)),
resource.getEndTimeMap().get(device),
Expand Down
31 changes: 18 additions & 13 deletions server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
Expand Up @@ -39,24 +39,29 @@ public static void checkTsFileResource(TsFileResource tsFileResource) throws IOE
try (TsFileSequenceReader reader = new TsFileSequenceReader(
tsFileResource.getFile().getAbsolutePath())) {
TsFileMetaData metaData = reader.readFileMetadata();
for (TsDeviceMetadataIndex index : metaData.getDeviceMap().values()) {
TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
.getChunkGroupMetaDataList();
for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
chunkMetaData.getStartTime());
tsFileResource
.updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
}
}
}
updateTsFileResource(metaData, reader, tsFileResource);
}
// write .resource file
tsFileResource.serialize();
} else {
tsFileResource.deSerialize();
}
}

public static void updateTsFileResource(TsFileMetaData metaData, TsFileSequenceReader reader,
TsFileResource tsFileResource) throws IOException {
for (TsDeviceMetadataIndex index : metaData.getDeviceMap().values()) {
TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
.getChunkGroupMetaDataList();
for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
chunkMetaData.getStartTime());
tsFileResource
.updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
}
}
}
}
}
Expand Up @@ -24,15 +24,18 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
Expand Down Expand Up @@ -107,23 +110,12 @@ public void recover() throws StorageGroupProcessorException {
try (TsFileSequenceReader reader = new TsFileSequenceReader(
tsFileResource.getFile().getAbsolutePath())) {
TsFileMetaData metaData = reader.readFileMetadata();
List<TsDeviceMetadataIndex> deviceMetadataIndexList = new ArrayList<>(
metaData.getDeviceMap().values());
for (TsDeviceMetadataIndex index : deviceMetadataIndexList) {
TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
.getChunkGroupMetaDataList();
for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
chunkMetaData.getStartTime());
tsFileResource
.updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
}
}
}
FileLoaderUtils.updateTsFileResource(metaData, reader, tsFileResource);
}
// write .resource file
long fileVersion =
Long.parseLong(tsFileResource.getFile().getName().split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[1]);
tsFileResource.setHistoricalVersions(Collections.singleton(fileVersion));
tsFileResource.serialize();
}
return;
Expand Down
Expand Up @@ -74,6 +74,7 @@ void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WritePro
+ i + IoTDBConstant.TSFILE_NAME_SEPARATOR + 0
+ ".tsfile"));
TsFileResource tsFileResource = new TsFileResource(file);
tsFileResource.setHistoricalVersions(Collections.singleton((long) i));
seqResources.add(tsFileResource);
prepareFile(tsFileResource, i * ptNum, ptNum, 0);
}
Expand All @@ -83,6 +84,7 @@ void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WritePro
+ i + IoTDBConstant.TSFILE_NAME_SEPARATOR + 0
+ ".tsfile"));
TsFileResource tsFileResource = new TsFileResource(file);
tsFileResource.setHistoricalVersions(Collections.singleton((long) (i + seqFileNum)));
unseqResources.add(tsFileResource);
prepareUnseqFile(tsFileResource, i * ptNum, ptNum * (i + 1) / unseqFileNum, 10000);
}
Expand All @@ -91,6 +93,7 @@ void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WritePro
+ IoTDBConstant.TSFILE_NAME_SEPARATOR + unseqFileNum + IoTDBConstant.TSFILE_NAME_SEPARATOR + 0
+ ".tsfile"));
TsFileResource tsFileResource = new TsFileResource(file);
tsFileResource.setHistoricalVersions(Collections.singleton((long) (seqFileNum + unseqFileNum)));
unseqResources.add(tsFileResource);
prepareUnseqFile(tsFileResource, 0, ptNum * unseqFileNum, 20000);
}
Expand Down

0 comments on commit 69192d9

Please sign in to comment.