Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix visible metadata, version, work processor bugs when recovering #966

Merged
merged 13 commits into from Apr 1, 2020
Expand Up @@ -30,7 +30,6 @@
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down
Expand Up @@ -498,8 +498,7 @@ public Map<String, Map<Long, List<TsFileResource>>> getAllClosedStorageGroupTsFi
if (!sequenceFile.isClosed()) {
continue;
}
String[] fileSplits = FilePathUtils.splitTsFilePath(sequenceFile);
long partitionNum = Long.parseLong(fileSplits[fileSplits.length - 2]);
long partitionNum = sequenceFile.getTimePartition();
Map<Long, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
, n -> new HashMap<>());
storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile);
Expand Down
Expand Up @@ -62,10 +62,8 @@
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.LoadEmptyFileException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
Expand All @@ -81,7 +79,6 @@
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryFileManager;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
import org.apache.iotdb.rpc.RpcUtils;
Expand Down Expand Up @@ -262,17 +259,15 @@ private void recover() throws StorageGroupProcessorException {
if (resource.getFile().length() == 0) {
deleteTsfile(resource.getFile());
}
String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
long partitionNum = resource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
}
for (TsFileResource resource : unseqTsFiles) {
//After recover, case the TsFile's length is equal to 0, delete both the TsFileResource and the file itself
if (resource.getFile().length() == 0) {
deleteTsfile(resource.getFile());
}
String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
long partitionNum = resource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
}

Expand All @@ -296,30 +291,16 @@ private void recover() throws StorageGroupProcessorException {
}

for (TsFileResource resource : sequenceFileTreeSet) {
long timePartitionId = getTimePartitionFromTsFileResource(resource);
if (timePartitionId != -1) {
latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
.putAll(resource.getEndTimeMap());
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(timePartitionId, id -> new HashMap<>())
.putAll(resource.getEndTimeMap());
globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
}
long timePartitionId = resource.getTimePartition();
latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
.putAll(resource.getEndTimeMap());
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(timePartitionId, id -> new HashMap<>())
.putAll(resource.getEndTimeMap());
globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
}
}

private long getTimePartitionFromTsFileResource(TsFileResource resource) {
// device id -> start map
// if start time map is empty, tsfile resource is empty, return -1;
Map<String, Long> startTimeMap = resource.getStartTimeMap();
// just find any time of device
Iterator<Long> iterator = startTimeMap.values().iterator();
if (iterator.hasNext()) {
return StorageEngine.getTimePartition(iterator.next());
}

return -1;
}

/**
* get version controller by time partition Id Thread-safety should be ensure by caller
Expand Down Expand Up @@ -393,7 +374,7 @@ private void recoverSeqFiles(List<TsFileResource> tsFiles) throws StorageGroupPr
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
sequenceFileTreeSet.add(tsFileResource);
long timePartitionId = getTimePartitionFromTsFileResource(tsFileResource);
long timePartitionId = tsFileResource.getTimePartition();

TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, false,
Expand All @@ -408,7 +389,7 @@ private void recoverSeqFiles(List<TsFileResource> tsFiles) throws StorageGroupPr
getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack,
this::updateLatestFlushTimeCallback, true, writer);
workUnsequenceTsFileProcessors
workSequenceTsFileProcessors
.put(timePartitionId, tsFileProcessor);
tsFileResource.setProcessor(tsFileProcessor);
tsFileProcessor.setTimeRangeId(timePartitionId);
Expand All @@ -422,7 +403,7 @@ private void recoverUnseqFiles(List<TsFileResource> tsFiles)
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
unSequenceFileList.add(tsFileResource);
long timePartitionId = getTimePartitionFromTsFileResource(tsFileResource);
long timePartitionId = tsFileResource.getTimePartition();

TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
Expand All @@ -437,6 +418,8 @@ private void recoverUnseqFiles(List<TsFileResource> tsFiles)
getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack,
this::unsequenceFlushCallback, false, writer);
workUnsequenceTsFileProcessors
.put(timePartitionId, tsFileProcessor);
tsFileResource.setProcessor(tsFileProcessor);
tsFileProcessor.setTimeRangeId(timePartitionId);
writer.makeMetadataVisible();
Expand Down Expand Up @@ -1217,7 +1200,7 @@ private void deleteDataInFiles(Collection<TsFileResource> tsFileResourceList, De
continue;
}

long partitionId = getTimePartitionFromTsFileResource(tsFileResource);
long partitionId = tsFileResource.getTimePartition();
deletion.setVersionNum(getVersionControllerByTimePartitionId(partitionId).nextVersion());

// write deletion into modification file
Expand Down Expand Up @@ -1508,7 +1491,7 @@ protected void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource
*/
public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws LoadFileException {
File tsfileToBeInserted = newTsFileResource.getFile();
long newFilePartitionId = getNewFilePartitionId(newTsFileResource);
long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
writeLock();
mergeLock.writeLock().lock();
try {
Expand Down Expand Up @@ -1544,7 +1527,7 @@ public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws LoadFi
*/
public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileException {
File tsfileToBeInserted = newTsFileResource.getFile();
long newFilePartitionId = getNewFilePartitionId(newTsFileResource);
long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
writeLock();
mergeLock.writeLock().lock();
try {
Expand All @@ -1564,7 +1547,7 @@ public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileExcep
// check whether the file name needs to be renamed.
if (!sequenceFileTreeSet.isEmpty()) {
String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos,
getTimePartitionFromTsFileResource(newTsFileResource), sequenceList);
newTsFileResource.getTimePartition(), sequenceList);
if (!newFileName.equals(tsfileToBeInserted.getName())) {
logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
tsfileToBeInserted.getName(), newFileName);
Expand All @@ -1577,8 +1560,7 @@ public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileExcep

// update latest time map
updateLatestTimeMap(newTsFileResource);
String[] filePathSplit = FilePathUtils.splitTsFilePath(newTsFileResource);
long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
long partitionNum = newTsFileResource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
.addAll(newTsFileResource.getHistoricalVersions());
} catch (DiskSpaceInsufficientException e) {
Expand All @@ -1593,40 +1575,6 @@ public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileExcep
}
}

/**
* Check and get the partition id of a TsFile to be inserted using the start times and end
* times of devices.
* TODO: when the partition violation happens, split the file and load into different partitions
* @throws LoadFileException if the data of the file cross partitions or it is empty
*/
private long getNewFilePartitionId(TsFileResource resource) throws LoadFileException {
long partitionId = -1;
for (Long startTime : resource.getStartTimeMap().values()) {
long p = StorageEngine.getTimePartition(startTime);
if (partitionId == -1) {
partitionId = p;
} else {
if (partitionId != p) {
throw new PartitionViolationException(resource);
}
}
}
for (Long endTime : resource.getEndTimeMap().values()) {
long p = StorageEngine.getTimePartition(endTime);
if (partitionId == -1) {
partitionId = p;
} else {
if (partitionId != p) {
throw new PartitionViolationException(resource);
}
}
}
if (partitionId == -1) {
throw new LoadEmptyFileException();
}
return partitionId;
}

/**
* Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
* @param newTsFileResource
Expand Down Expand Up @@ -1715,15 +1663,10 @@ private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {

/**
* If the historical versions of a file is a sub-set of the given file's, remove it to reduce
<<<<<<< HEAD
* unnecessary merge. Only used when the file sender and the receiver share the same file close
* policy.
=======
* unnecessary merge. Only used when the file sender and the receiver share the same file
* close policy.
* Warning: DO NOT REMOVE
* @param resource
>>>>>>> master
*/
@SuppressWarnings("unused")
public void removeFullyOverlapFiles(TsFileResource resource) {
Expand Down
Expand Up @@ -20,10 +20,13 @@

import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
Expand Down Expand Up @@ -471,4 +474,50 @@ public void setProcessor(TsFileProcessor processor) {
public TimeseriesMetadata getTimeSeriesMetadata() {
return timeSeriesMetadata;
}

/**
* make sure Either the startTimeMap is not empty
* Or the path contains a partition folder
*/
public long getTimePartition() {
if (startTimeMap != null && !startTimeMap.isEmpty()) {
return StorageEngine.getTimePartition(startTimeMap.values().iterator().next());
}
String[] splits = FilePathUtils.splitTsFilePath(this);
return Long.parseLong(splits[splits.length - 2]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we sync a tsfile from other iotdb, will this tsfile has time partition directory? Maybe we should use startTimeMap to get time partition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I use the startTimeMap first, then the path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not guaranteed that the other IoTDB will use the same partition interval, so it is better to check the whole startTimeMap and endTimeMap to see if any device has crossed a partition and report an exception if so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restart and load could use different getPartition method, one check partition and the other not. I will add a method with partition check

}

/**
* Used when load new TsFiles not generated by the server
* Check and get the time partition
* TODO: when the partition violation happens, split the file and load into different partitions
* @throws PartitionViolationException if the data of the file cross partitions or it is empty
*/
public long getTimePartitionWithCheck() throws PartitionViolationException {
long partitionId = -1;
for (Long startTime : startTimeMap.values()) {
long p = StorageEngine.getTimePartition(startTime);
if (partitionId == -1) {
partitionId = p;
} else {
if (partitionId != p) {
throw new PartitionViolationException(this);
}
}
}
for (Long endTime : endTimeMap.values()) {
long p = StorageEngine.getTimePartition(endTime);
if (partitionId == -1) {
partitionId = p;
} else {
if (partitionId != p) {
throw new PartitionViolationException(this);
}
}
}
if (partitionId == -1) {
throw new PartitionViolationException(this);
}
return partitionId;
}
}
Expand Up @@ -46,7 +46,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
Expand Down Expand Up @@ -113,6 +112,7 @@
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
Expand Down Expand Up @@ -571,9 +571,10 @@ private void loadFile(File file, OperateFilePlan plan) throws QueryProcessExcept
file.getAbsolutePath()));
}
Map<Path, MeasurementSchema> schemaMap = new HashMap<>();
Map<Path, List<ChunkMetadata>> chunkMetaDataListMap = new HashMap<>();

List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
reader.selfCheck(schemaMap, chunkMetaDataListMap, false);
reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
}

FileLoaderUtils.checkTsFileResource(tsFileResource);
Expand All @@ -586,7 +587,7 @@ private void loadFile(File file, OperateFilePlan plan) throws QueryProcessExcept

//create schemas if they doesn't exist
if (plan.isAutoCreateSchema()) {
createSchemaAutomatically(chunkMetaDataListMap, schemaMap, plan.getSgLevel());
createSchemaAutomatically(chunkGroupMetadataList, schemaMap, plan.getSgLevel());
}

StorageEngine.getInstance().loadNewTsFile(tsFileResource);
Expand All @@ -596,36 +597,34 @@ private void loadFile(File file, OperateFilePlan plan) throws QueryProcessExcept
}
}

private void createSchemaAutomatically(Map<Path, List<ChunkMetadata>> chunkMetaDataListMap,
private void createSchemaAutomatically(
List<ChunkGroupMetadata> chunkGroupMetadataList,
Map<Path, MeasurementSchema> knownSchemas, int sgLevel)
throws QueryProcessException, MetadataException {
if (chunkMetaDataListMap.isEmpty()) {
if (chunkGroupMetadataList.isEmpty()) {
return;
}
for (Entry<Path, List<ChunkMetadata>> entry : chunkMetaDataListMap.entrySet()) {
String device = entry.getKey().getDevice();

Set<Path> registeredSeries = new HashSet<>();
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
String device = chunkGroupMetadata.getDevice();
MNode node = mManager.getDeviceNodeWithAutoCreateStorageGroup(device, true, sgLevel);
for (ChunkMetadata chunkMetaData : entry.getValue()) {
String measurement = chunkMetaData.getMeasurementUid();
String fullPath = device + IoTDBConstant.PATH_SEPARATOR + measurement;
MeasurementSchema schema = knownSchemas.get(entry.getKey());
if (schema == null) {
throw new MetadataException(String
.format("Can not get the schema of measurement [%s]", measurement));
}
if (!node.hasChild(measurement)) {
try {
mManager.createTimeseries(fullPath, schema.getType(), schema.getEncodingType(),
schema.getCompressor(), Collections.emptyMap());
} catch (MetadataException e) {
if (!e.getMessage().contains("already exist")) {
throw e;
}
for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
if (!registeredSeries.contains(series)) {
registeredSeries.add(series);
MeasurementSchema schema = knownSchemas.get(series);
if (schema == null) {
throw new MetadataException(String.format("Can not get the schema of measurement [%s]",
chunkMetadata.getMeasurementUid()));
}
if (!node.hasChild(chunkMetadata.getMeasurementUid())) {
mManager.createTimeseries(series.getFullPath(), schema.getType(),
schema.getEncodingType(), schema.getCompressor(), Collections.emptyMap());
} else if (node.getChild(chunkMetadata.getMeasurementUid()) instanceof InternalMNode) {
throw new QueryProcessException(
String.format("Current Path is not leaf node. %s", series));
}
}
if (node.getChild(measurement) instanceof InternalMNode) {
throw new QueryProcessException(
String.format("Current Path is not leaf node. %s", fullPath));
}
}
}
Expand Down