Skip to content

Commit

Permalink
add recover processing for crashing while memtable is not full
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 committed Jun 29, 2020
1 parent 5b94dc2 commit b44f46a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 19 deletions.
Expand Up @@ -582,7 +582,7 @@ private TsFileSequenceReader getFileReader(TsFileResource tsFileResource) throws
return new TsFileSequenceReader(tsFileResource.getPath(), true);
}

private File createNewVMFile() {
public static File createNewVMFile(TsFileResource tsFileResource) {
File parent = tsFileResource.getFile().getParentFile();
return FSFactoryProducer.getFSFactory().getFile(parent,
tsFileResource.getFile().getName() + IoTDBConstant.TSFILE_NAME_SEPARATOR + System
Expand Down Expand Up @@ -717,7 +717,7 @@ public void flushOneMemTable() {
} else {
isVm = true;
isFull = false;
File newVmFile = createNewVMFile();
File newVmFile = createNewVMFile(tsFileResource);
vmTsFileResources.add(new TsFileResource(newVmFile));
vmWriters.add(new RestorableTsFileIOWriter(newVmFile));
flushTask = new MemTableFlushTask(memTableToFlush, writer, vmTsFileResources,
Expand All @@ -739,7 +739,7 @@ public void flushOneMemTable() {
RestorableTsFileIOWriter tmpWriter = flushTask.syncFlushMemTable();
if (isVm && isFull && tmpWriter != null) {
File tmpFile = tmpWriter.getFile();
File newVmFile = createNewVMFile();
File newVmFile = createNewVMFile(tsFileResource);
File mergedFile = FSFactoryProducer.getFSFactory().getFile(newVmFile.getPath()
+ MERGED_SUFFIX);
tmpFile.renameTo(mergedFile);
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.writelog.recover;

import static org.apache.iotdb.db.engine.flush.VmLogger.VM_LOG_NAME;
import static org.apache.iotdb.db.engine.storagegroup.TsFileProcessor.createNewVMFile;
import static org.apache.iotdb.db.engine.storagegroup.TsFileProcessor.deleteVmFile;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SUFFIX;

Expand Down Expand Up @@ -137,32 +138,72 @@ public Pair<RestorableTsFileIOWriter, List<RestorableTsFileIOWriter>> recover()
!lastRestorableTsFileIOWriter.hasCrashed() && !lastRestorableTsFileIOWriter.canWrite();

if (isComplete) {
// tsfile is complete
// tsfile is complete, vmfile is never complete because it's canWrite() always return true.
try {
recoverResource(resource, true);
for (TsFileResource tsFileResource : vmTsFileResources) {
recoverResource(tsFileResource, false);
}
return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
} catch (IOException e) {
throw new StorageGroupProcessorException(
"recover the resource file failed: " + filePath
+ RESOURCE_SUFFIX + e);
}
} else {
// if the last file in vmTsFileResources is not complete, we should also recover the tsfile resource
if (!vmTsFileResources.isEmpty()) {
try {
recoverResource(resource, false);
} catch (IOException e) {
throw new StorageGroupProcessorException(
"recover the resource file failed: " + filePath
+ RESOURCE_SUFFIX + e);
// if the last file in vmTsFileResources crash
if (lastRestorableTsFileIOWriter.hasCrashed()) {
try {
recoverResource(resource, false);
for (int i = 0; i < vmTsFileResources.size() - 1; i++) {
recoverResource(vmTsFileResources.get(i), false);
}
// due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
// map must be updated first to avoid duplicated insertion
recoverResourceFromWriter(lastRestorableTsFileIOWriter, lastTsFileResource);
} catch (IOException e) {
throw new StorageGroupProcessorException(
"recover the resource file failed: " + filePath
+ RESOURCE_SUFFIX + e);
}
} else { // last vmfile is not crash
try {
for (TsFileResource vmTsFileResource : vmTsFileResources) {
recoverResource(vmTsFileResource, false);
}
// tsfile is not crash
if (!restorableTsFileIOWriter.hasCrashed()) {
recoverResource(resource, false);

// if wal exists, we should open a new vmfile to replay it
File newVmFile = createNewVMFile(resource);
TsFileResource newVmTsFileResource = new TsFileResource(newVmFile);
RestorableTsFileIOWriter newVMWriter = new RestorableTsFileIOWriter(newVmFile);
if (redoLogs(newVMWriter,newVmTsFileResource)) {
vmTsFileResources.add(newVmTsFileResource);
vmRestorableTsFileIOWriterList.add(newVMWriter);
} else {
newVmFile.delete();
}
}
// clean logs
try {
MultiFileLogNodeManager.getInstance().deleteNode(
logNodePrefix + SystemFileFactory.INSTANCE.getFile(filePath).getName());
} catch (IOException e) {
throw new StorageGroupProcessorException(e);
}
return new Pair<>(restorableTsFileIOWriter, vmRestorableTsFileIOWriterList);
} catch (IOException e) {
throw new StorageGroupProcessorException(
"recover the resource file failed: " + filePath
+ RESOURCE_SUFFIX + e);
}
}
} else {
// tsfile has crashed
// due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
// map must be updated first to avoid duplicated insertion
recoverResourceFromWriter(lastRestorableTsFileIOWriter, lastTsFileResource);
}
// due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
// map must be updated first to avoid duplicated insertion
recoverResourceFromWriter(lastRestorableTsFileIOWriter, lastTsFileResource);
}

// redo logs
Expand Down Expand Up @@ -248,19 +289,21 @@ private void recoverResourceFromWriter(RestorableTsFileIOWriter restorableTsFile
tsFileResource.setHistoricalVersions(Collections.singleton(fileVersion));
}

private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter,
private boolean redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter,
TsFileResource tsFileResource) throws StorageGroupProcessorException {
IMemTable recoverMemTable = new PrimitiveMemTable();
recoverMemTable.setVersion(versionController.nextVersion());
LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath, tsFileResource.getModFile(),
versionController, tsFileResource, recoverMemTable, sequence);
logReplayer.replayLogs();
boolean res = false;
try {
if (!recoverMemTable.isEmpty()) {
List<TsFileResource> deleteTsFileResources = new ArrayList<>();
// flush logs
MemTableFlushTask tableFlushTask = new MemTableFlushTask(recoverMemTable,
restorableTsFileIOWriter, deleteTsFileResources, new ArrayList<>(), false, false, sequence,
restorableTsFileIOWriter, deleteTsFileResources, new ArrayList<>(), false, false,
sequence,
tsFileResource.getFile().getParentFile().getParentFile().getName());
tableFlushTask.syncFlushMemTable();
for (TsFileResource vmTsFileResource : deleteTsFileResources) {
Expand All @@ -270,6 +313,7 @@ private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter,
.getFile(tsFileResource.getFile().getParent(),
tsFileResource.getFile().getName() + VM_LOG_NAME);
logFile.delete();
res = true;
}

if (!isLastFile || tsFileResource.isCloseFlagSet()) {
Expand All @@ -280,6 +324,7 @@ private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter,
}
// otherwise this file is not closed before crush, do nothing so we can continue writing
// into it
return res;
} catch (IOException | InterruptedException | ExecutionException e) {
throw new StorageGroupProcessorException(e);
}
Expand Down

0 comments on commit b44f46a

Please sign in to comment.