Skip to content
Permalink
Browse files
[ASTERIXDB-2738][STO] Create Mask File Before Merge Operations
- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Before starting a merge operation, create a mask file
  (.mask_C_startSeq_endSeq) for the merged component to
  indicate that this component isn't valid yet.
- On the merge operation successful completion, delete the
  merged component mask file.
- In the case of any unexpected failure during the merge
  operation, all files of the failed merged component will
  be deleted on node startup/shutdown, including the mask
  file.
- Halt on any IO opeartion failure.
- Add a test case that ensures only masked merged components
  are deleted but not the original components that were
  supposed to be merged.

Change-Id: I476dd3be5e75468e83044b3aaf0f6c2d8beadf1c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6425
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Luo Chen <cluo8@uci.edu>
  • Loading branch information
mhubail committed May 22, 2020
1 parent 0f407d9 commit fe0f6fa181b03195cda8ff3585734ce10f466467
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 21 deletions.
@@ -20,7 +20,6 @@

import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.LogManager;
@@ -42,8 +41,6 @@ public void schedulerFailed(ILSMIOOperationScheduler scheduler, Throwable failur
@Override
public void operationFailed(ILSMIOOperation operation, Throwable t) {
LOGGER.error("Operation {} has failed", operation, t);
if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
ExitUtil.halt(ExitUtil.EC_FLUSH_FAILED);
}
ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
}
}
@@ -171,6 +171,59 @@ public void deleteCorruptedResourcesTest() throws Exception {
Assert.assertFalse(indexMetadataMaskFile.exists());
}

@Test
public void deleteMaskedMergedFiles() throws Exception {
final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
final String nodeId = ncAppCtx.getServiceContext().getNodeId();
final String datasetName = "ds";
TestDataUtil.createIdOnlyDataset(datasetName);
final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName);
final String indexPath = TestDataUtil.getIndexPath(integrationUtil, dataset, nodeId);
FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath);
int compSeqStart = 100;
int validComponentSequence = 103;
// advance valid component seq in checkpoint
PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) ncAppCtx.getLocalResourceRepository();
LocalResource localResource = localResourceRepository.get(indexPath);
DatasetResourceReference drr = DatasetResourceReference.of(localResource);
IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider();
IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr);
indexCheckpointManager.advanceValidComponentSequence(validComponentSequence);
// create components to be merged
String btree = "_b";
String filter = "_f";
String indexDir = indexDirRef.getFile().getAbsolutePath();
for (int i = compSeqStart; i <= validComponentSequence; i++) {
String componentId = i + "_" + i;
Path btreePath = Paths.get(indexDir, componentId + btree);
Path filterPath = Paths.get(indexDir, componentId + filter);
Files.createFile(btreePath);
Files.createFile(filterPath);
}
// create masked merged component
String mergedComponentId = compSeqStart + "_" + validComponentSequence;
Path mergedBtreePath = Paths.get(indexDir, mergedComponentId + btree);
Path mergedFilterPath = Paths.get(indexDir, mergedComponentId + filter);
Path mergeMaskPath = Paths.get(indexDir, StorageConstants.COMPONENT_MASK_FILE_PREFIX + mergedComponentId);
Files.createFile(mergedBtreePath);
Files.createFile(mergedFilterPath);
Files.createFile(mergeMaskPath);
// cleanup storage and ensure merged component files were deleted while individual files still exist
DatasetLocalResource lr = (DatasetLocalResource) localResourceRepository.get(indexPath).getResource();
localResourceRepository.cleanup(lr.getPartition());
Assert.assertFalse(mergedBtreePath.toFile().exists());
Assert.assertFalse(mergedFilterPath.toFile().exists());
Assert.assertFalse(mergeMaskPath.toFile().exists());
for (int i = compSeqStart; i <= validComponentSequence; i++) {
String componentId = i + "_" + i;
Path btreePath = Paths.get(indexDir, componentId + btree);
Path filterPath = Paths.get(indexDir, componentId + filter);
Assert.assertTrue(btreePath.toFile().exists());
Assert.assertTrue(filterPath.toFile().exists());
}
}

private void ensureInvalidComponentDeleted(String indexDir, String componentSeq,
PersistentLocalResourceRepository localResourceRepository, DatasetLocalResource lr) throws IOException {
Path btreePath = Paths.get(indexDir,
@@ -19,6 +19,10 @@

package org.apache.asterix.common.ioopcallbacks;

import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
@@ -28,8 +32,10 @@
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -83,7 +89,15 @@ public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMCompon

@Override
public void beforeOperation(ILSMIOOperation operation) throws HyracksDataException {
// No Op
if (isMerge(operation)) {
FileReference operationMaskFilePath = getOperationMaskFilePath(operation);
// if a merge operation is attempted after a failure, its mask file may already exists
if (!operationMaskFilePath.getFile().exists()) {
IoUtil.create(operationMaskFilePath);
} else {
LOGGER.warn("merge operation mask file {} already exists", operationMaskFilePath);
}
}
}

@Override
@@ -121,6 +135,8 @@ public void afterFinalize(ILSMIOOperation operation) throws HyracksDataException
} else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH
|| operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
addComponentToCheckpoint(operation);
} else if (isMerge(operation)) {
IoUtil.delete(getOperationMaskFilePath(operation));
}
}

@@ -277,4 +293,18 @@ public synchronized boolean hasPendingFlush() {
public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
// no op
}

private boolean isMerge(ILSMIOOperation operation) {
return operation.getIOOpertionType() == LSMIOOperationType.MERGE
&& operation.getAccessor().getOpContext().getOperation() != IndexOperation.DELETE_COMPONENTS;
}

private static FileReference getOperationMaskFilePath(ILSMIOOperation operation) {
FileReference target = operation.getTarget();
final String componentSequence = getComponentSequence(target.getFile().getAbsolutePath());
Path idxRelPath = Paths.get(target.getRelativePath()).getParent();
Path maskFileRelPath =
Paths.get(idxRelPath.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
return new FileReference(target.getDeviceHandle(), maskFileRelPath.toString());
}
}
@@ -24,6 +24,7 @@

import org.apache.asterix.common.utils.StorageConstants;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;

public class ResourceReference {

@@ -130,4 +131,17 @@ public int hashCode() {
public String toString() {
return getRelativePath().toString();
}

/**
* Gets a component sequence based on its unique timestamp.
* e.g. a component file 1_3_b
* will return a component sequence 1_3
*
* @param componentFile any component file
* @return The component sequence
*/
public static String getComponentSequence(String componentFile) {
final ResourceReference ref = of(componentFile);
return IndexComponentFileReference.of(ref.getName()).getSequence();
}
}
@@ -28,10 +28,10 @@

import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.replication.api.IReplicaTask;
import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
@@ -63,7 +63,7 @@ public static Path getComponentMaskPath(INcApplicationContext appCtx, String com
final IIOManager ioManager = appCtx.getIoManager();
final FileReference localPath = ioManager.resolve(componentFile);
final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
final String componentSequence = PersistentLocalResourceRepository.getComponentSequence(componentFile);
final String componentSequence = ResourceReference.getComponentSequence(componentFile);
return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
}

@@ -18,6 +18,7 @@
*/
package org.apache.asterix.transaction.management.resource;

import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
import static org.apache.asterix.common.utils.StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX;
import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
@@ -593,19 +594,6 @@ private Path getResourceMaskFilePath(FileReference resourceFile) {
return Paths.get(resourceFile.getFile().getParentFile().getAbsolutePath(), METADATA_FILE_MASK_NAME);
}

/**
* Gets a component sequence based on its unique timestamp.
* e.g. a component file 1_3_b
* will return a component sequence 1_3
*
* @param componentFile any component file
* @return The component sequence
*/
public static String getComponentSequence(String componentFile) {
final ResourceReference ref = ResourceReference.of(componentFile);
return IndexComponentFileReference.of(ref.getName()).getSequence();
}

private static boolean isComponentMask(File mask) {
return mask.getName().startsWith(StorageConstants.COMPONENT_MASK_FILE_PREFIX);
}
@@ -58,7 +58,7 @@ public class ExitUtil {
public static final int EC_IO_SCHEDULER_FAILED = 55;
public static final int EC_HALT_SHUTDOWN_TIMED_OUT = 66;
public static final int EC_HALT_WATCHDOG_FAILED = 77;
public static final int EC_FLUSH_FAILED = 88;
public static final int EC_IO_OPERATION_FAILED = 88;
public static final int EC_TERMINATE_NC_SERVICE_DIRECTIVE = 99;
private static final ExitThread exitThread = new ExitThread();
private static final ShutdownWatchdog watchdogThread = new ShutdownWatchdog();

0 comments on commit fe0f6fa

Please sign in to comment.