Skip to content

Commit

Permalink
Fix visible metadata, version, work processor bugs when recovering (#966
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Jialin Qiao committed Apr 1, 2020
1 parent 053d6ab commit c6c23a3
Show file tree
Hide file tree
Showing 17 changed files with 342 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,7 @@ public Map<String, Map<Long, List<TsFileResource>>> getAllClosedStorageGroupTsFi
if (!sequenceFile.isClosed()) {
continue;
}
String[] fileSplits = FilePathUtils.splitTsFilePath(sequenceFile);
long partitionNum = Long.parseLong(fileSplits[fileSplits.length - 2]);
long partitionNum = sequenceFile.getTimePartition();
Map<Long, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
, n -> new HashMap<>());
storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,8 @@
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.LoadEmptyFileException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
Expand All @@ -81,7 +79,6 @@
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryFileManager;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
import org.apache.iotdb.rpc.RpcUtils;
Expand Down Expand Up @@ -262,17 +259,15 @@ private void recover() throws StorageGroupProcessorException {
if (resource.getFile().length() == 0) {
deleteTsfile(resource.getFile());
}
String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
long partitionNum = resource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
}
for (TsFileResource resource : unseqTsFiles) {
//After recover, case the TsFile's length is equal to 0, delete both the TsFileResource and the file itself
if (resource.getFile().length() == 0) {
deleteTsfile(resource.getFile());
}
String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
long partitionNum = resource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
}

Expand All @@ -296,30 +291,16 @@ private void recover() throws StorageGroupProcessorException {
}

for (TsFileResource resource : sequenceFileTreeSet) {
long timePartitionId = getTimePartitionFromTsFileResource(resource);
if (timePartitionId != -1) {
latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
.putAll(resource.getEndTimeMap());
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(timePartitionId, id -> new HashMap<>())
.putAll(resource.getEndTimeMap());
globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
}
long timePartitionId = resource.getTimePartition();
latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
.putAll(resource.getEndTimeMap());
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(timePartitionId, id -> new HashMap<>())
.putAll(resource.getEndTimeMap());
globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
}
}

private long getTimePartitionFromTsFileResource(TsFileResource resource) {
// device id -> start map
// if start time map is empty, tsfile resource is empty, return -1;
Map<String, Long> startTimeMap = resource.getStartTimeMap();
// just find any time of device
Iterator<Long> iterator = startTimeMap.values().iterator();
if (iterator.hasNext()) {
return StorageEngine.getTimePartition(iterator.next());
}

return -1;
}

/**
* get version controller by time partition Id Thread-safety should be ensure by caller
Expand Down Expand Up @@ -393,7 +374,7 @@ private void recoverSeqFiles(List<TsFileResource> tsFiles) throws StorageGroupPr
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
sequenceFileTreeSet.add(tsFileResource);
long timePartitionId = getTimePartitionFromTsFileResource(tsFileResource);
long timePartitionId = tsFileResource.getTimePartition();

TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, false,
Expand All @@ -408,7 +389,7 @@ private void recoverSeqFiles(List<TsFileResource> tsFiles) throws StorageGroupPr
getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack,
this::updateLatestFlushTimeCallback, true, writer);
workUnsequenceTsFileProcessors
workSequenceTsFileProcessors
.put(timePartitionId, tsFileProcessor);
tsFileResource.setProcessor(tsFileProcessor);
tsFileProcessor.setTimeRangeId(timePartitionId);
Expand All @@ -422,7 +403,7 @@ private void recoverUnseqFiles(List<TsFileResource> tsFiles)
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
unSequenceFileList.add(tsFileResource);
long timePartitionId = getTimePartitionFromTsFileResource(tsFileResource);
long timePartitionId = tsFileResource.getTimePartition();

TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
Expand All @@ -437,6 +418,8 @@ private void recoverUnseqFiles(List<TsFileResource> tsFiles)
getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack,
this::unsequenceFlushCallback, false, writer);
workUnsequenceTsFileProcessors
.put(timePartitionId, tsFileProcessor);
tsFileResource.setProcessor(tsFileProcessor);
tsFileProcessor.setTimeRangeId(timePartitionId);
writer.makeMetadataVisible();
Expand Down Expand Up @@ -1217,7 +1200,7 @@ private void deleteDataInFiles(Collection<TsFileResource> tsFileResourceList, De
continue;
}

long partitionId = getTimePartitionFromTsFileResource(tsFileResource);
long partitionId = tsFileResource.getTimePartition();
deletion.setVersionNum(getVersionControllerByTimePartitionId(partitionId).nextVersion());

// write deletion into modification file
Expand Down Expand Up @@ -1508,7 +1491,7 @@ protected void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource
*/
public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws LoadFileException {
File tsfileToBeInserted = newTsFileResource.getFile();
long newFilePartitionId = getNewFilePartitionId(newTsFileResource);
long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
writeLock();
mergeLock.writeLock().lock();
try {
Expand Down Expand Up @@ -1544,7 +1527,7 @@ public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws LoadFi
*/
public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileException {
File tsfileToBeInserted = newTsFileResource.getFile();
long newFilePartitionId = getNewFilePartitionId(newTsFileResource);
long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
writeLock();
mergeLock.writeLock().lock();
try {
Expand All @@ -1564,7 +1547,7 @@ public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileExcep
// check whether the file name needs to be renamed.
if (!sequenceFileTreeSet.isEmpty()) {
String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos,
getTimePartitionFromTsFileResource(newTsFileResource), sequenceList);
newTsFileResource.getTimePartition(), sequenceList);
if (!newFileName.equals(tsfileToBeInserted.getName())) {
logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
tsfileToBeInserted.getName(), newFileName);
Expand All @@ -1577,8 +1560,7 @@ public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileExcep

// update latest time map
updateLatestTimeMap(newTsFileResource);
String[] filePathSplit = FilePathUtils.splitTsFilePath(newTsFileResource);
long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
long partitionNum = newTsFileResource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
.addAll(newTsFileResource.getHistoricalVersions());
} catch (DiskSpaceInsufficientException e) {
Expand All @@ -1593,40 +1575,6 @@ public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileExcep
}
}

/**
* Check and get the partition id of a TsFile to be inserted using the start times and end
* times of devices.
* TODO: when the partition violation happens, split the file and load into different partitions
* @throws LoadFileException if the data of the file cross partitions or it is empty
*/
private long getNewFilePartitionId(TsFileResource resource) throws LoadFileException {
long partitionId = -1;
for (Long startTime : resource.getStartTimeMap().values()) {
long p = StorageEngine.getTimePartition(startTime);
if (partitionId == -1) {
partitionId = p;
} else {
if (partitionId != p) {
throw new PartitionViolationException(resource);
}
}
}
for (Long endTime : resource.getEndTimeMap().values()) {
long p = StorageEngine.getTimePartition(endTime);
if (partitionId == -1) {
partitionId = p;
} else {
if (partitionId != p) {
throw new PartitionViolationException(resource);
}
}
}
if (partitionId == -1) {
throw new LoadEmptyFileException();
}
return partitionId;
}

/**
* Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
* @param newTsFileResource
Expand Down Expand Up @@ -1715,15 +1663,10 @@ private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {

/**
* If the historical versions of a file is a sub-set of the given file's, remove it to reduce
<<<<<<< HEAD
* unnecessary merge. Only used when the file sender and the receiver share the same file close
* policy.
=======
* unnecessary merge. Only used when the file sender and the receiver share the same file
* close policy.
* Warning: DO NOT REMOVE
* @param resource
>>>>>>> master
*/
@SuppressWarnings("unused")
public void removeFullyOverlapFiles(TsFileResource resource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
Expand Down Expand Up @@ -471,4 +474,50 @@ public void setProcessor(TsFileProcessor processor) {
public TimeseriesMetadata getTimeSeriesMetadata() {
return timeSeriesMetadata;
}

/**
* make sure Either the startTimeMap is not empty
* Or the path contains a partition folder
*/
public long getTimePartition() {
if (startTimeMap != null && !startTimeMap.isEmpty()) {
return StorageEngine.getTimePartition(startTimeMap.values().iterator().next());
}
String[] splits = FilePathUtils.splitTsFilePath(this);
return Long.parseLong(splits[splits.length - 2]);
}

/**
* Used when load new TsFiles not generated by the server
* Check and get the time partition
* TODO: when the partition violation happens, split the file and load into different partitions
* @throws PartitionViolationException if the data of the file cross partitions or it is empty
*/
public long getTimePartitionWithCheck() throws PartitionViolationException {
long partitionId = -1;
for (Long startTime : startTimeMap.values()) {
long p = StorageEngine.getTimePartition(startTime);
if (partitionId == -1) {
partitionId = p;
} else {
if (partitionId != p) {
throw new PartitionViolationException(this);
}
}
}
for (Long endTime : endTimeMap.values()) {
long p = StorageEngine.getTimePartition(endTime);
if (partitionId == -1) {
partitionId = p;
} else {
if (partitionId != p) {
throw new PartitionViolationException(this);
}
}
}
if (partitionId == -1) {
throw new PartitionViolationException(this);
}
return partitionId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
Expand Down Expand Up @@ -113,6 +112,7 @@
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
Expand Down Expand Up @@ -571,9 +571,10 @@ private void loadFile(File file, OperateFilePlan plan) throws QueryProcessExcept
file.getAbsolutePath()));
}
Map<Path, MeasurementSchema> schemaMap = new HashMap<>();
Map<Path, List<ChunkMetadata>> chunkMetaDataListMap = new HashMap<>();

List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
reader.selfCheck(schemaMap, chunkMetaDataListMap, false);
reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
}

FileLoaderUtils.checkTsFileResource(tsFileResource);
Expand All @@ -586,7 +587,7 @@ private void loadFile(File file, OperateFilePlan plan) throws QueryProcessExcept

//create schemas if they doesn't exist
if (plan.isAutoCreateSchema()) {
createSchemaAutomatically(chunkMetaDataListMap, schemaMap, plan.getSgLevel());
createSchemaAutomatically(chunkGroupMetadataList, schemaMap, plan.getSgLevel());
}

StorageEngine.getInstance().loadNewTsFile(tsFileResource);
Expand All @@ -596,36 +597,34 @@ private void loadFile(File file, OperateFilePlan plan) throws QueryProcessExcept
}
}

private void createSchemaAutomatically(Map<Path, List<ChunkMetadata>> chunkMetaDataListMap,
private void createSchemaAutomatically(
List<ChunkGroupMetadata> chunkGroupMetadataList,
Map<Path, MeasurementSchema> knownSchemas, int sgLevel)
throws QueryProcessException, MetadataException {
if (chunkMetaDataListMap.isEmpty()) {
if (chunkGroupMetadataList.isEmpty()) {
return;
}
for (Entry<Path, List<ChunkMetadata>> entry : chunkMetaDataListMap.entrySet()) {
String device = entry.getKey().getDevice();

Set<Path> registeredSeries = new HashSet<>();
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
String device = chunkGroupMetadata.getDevice();
MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(device, true, sgLevel);
for (ChunkMetadata chunkMetaData : entry.getValue()) {
String measurement = chunkMetaData.getMeasurementUid();
String fullPath = device + IoTDBConstant.PATH_SEPARATOR + measurement;
MeasurementSchema schema = knownSchemas.get(entry.getKey());
if (schema == null) {
throw new MetadataException(String
.format("Can not get the schema of measurement [%s]", measurement));
}
if (!node.hasChild(measurement)) {
try {
mManager.createTimeseries(fullPath, schema.getType(), schema.getEncodingType(),
schema.getCompressor(), Collections.emptyMap());
} catch (MetadataException e) {
if (!e.getMessage().contains("already exist")) {
throw e;
}
for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
if (!registeredSeries.contains(series)) {
registeredSeries.add(series);
MeasurementSchema schema = knownSchemas.get(series);
if (schema == null) {
throw new MetadataException(String.format("Can not get the schema of measurement [%s]",
chunkMetadata.getMeasurementUid()));
}
if (!node.hasChild(chunkMetadata.getMeasurementUid())) {
mManager.createTimeseries(series.getFullPath(), schema.getType(),
schema.getEncodingType(), schema.getCompressor(), Collections.emptyMap());
} else if (node.getChild(chunkMetadata.getMeasurementUid()) instanceof InternalMNode) {
throw new QueryProcessException(
String.format("Current Path is not leaf node. %s", series));
}
}
if (node.getChild(measurement) instanceof InternalMNode) {
throw new QueryProcessException(
String.format("Current Path is not leaf node. %s", fullPath));
}
}
}
Expand Down
Loading

0 comments on commit c6c23a3

Please sign in to comment.