Skip to content
Permalink
Browse files
[NO ISSUE][STO] Ensure component seq is always increasing
- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Initialize last used component sequence number
  from the index checkpoint. This way even in the
  case of a component deletion, the next written
  component will use a sequence number that was
  not used before.

Change-Id: I48babb9bf251c86520788942ade82ca6ded5b377
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12944
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
  • Loading branch information
mhubail committed Aug 23, 2021
1 parent cf7156d commit dc3704f65b6290234b0296ad0b7cfe8c94900a59
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 9 deletions.
@@ -34,6 +34,7 @@
import org.apache.asterix.common.storage.IndexCheckpoint;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
@@ -119,7 +120,10 @@ public synchronized void delete() {

@Override
public long getValidComponentSequence() throws HyracksDataException {
return getLatest().getValidComponentSequence();
if (getCheckpointCount() > 0) {
return getLatest().getValidComponentSequence();
}
return AbstractLSMIndexFileManager.UNINITIALIZED_COMPONENT_SEQ;
}

@Override
@@ -294,6 +294,12 @@ public void allocated(ILSMMemoryComponent component) throws HyracksDataException
// no op
}

@Override
public long getLastValidSequence() throws HyracksDataException {
ResourceReference resourceReference = ResourceReference.ofIndex(lsmIndex.getIndexIdentifier());
return indexCheckpointManagerProvider.get(resourceReference).getValidComponentSequence();
}

private boolean isMerge(ILSMIOOperation operation) {
return operation.getIOOpertionType() == LSMIOOperationType.MERGE
&& operation.getAccessor().getOpContext().getOperation() != IndexOperation.DELETE_COMPONENTS;
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.replication.messaging;

import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.UNINITIALIZED_COMPONENT_SEQ;

import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -72,7 +74,7 @@ public void perform(INcApplicationContext appCtx, IReplicationWorker worker) thr
throw HyracksDataException
.create(new IOException(indexPath + " is not a directory or an IO Error occurred"));
}
long maxComponentSequence = Long.MIN_VALUE;
long maxComponentSequence = UNINITIALIZED_COMPONENT_SEQ;
for (String file : files) {
maxComponentSequence =
Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd());
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.replication.messaging;

import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.UNINITIALIZED_COMPONENT_SEQ;

import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -100,7 +102,7 @@ private void initIndexCheckpoint(INcApplicationContext appCtx) throws HyracksDat
final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
indexCheckpointManager.delete();
indexCheckpointManager.init(Long.MIN_VALUE, currentLSN,
indexCheckpointManager.init(UNINITIALIZED_COMPONENT_SEQ, currentLSN,
LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
LOGGER.info(() -> "Checkpoint index: " + indexRef);
}
@@ -23,6 +23,7 @@
import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.UNINITIALIZED_COMPONENT_SEQ;

import java.io.File;
import java.io.FilenameFilter;
@@ -198,8 +199,8 @@ public synchronized void insert(LocalResource resource) throws HyracksDataExcept
createResourceFileMask(resourceFile);
byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
FileUtil.writeAndForce(Paths.get(resourceFile.getAbsolutePath()), bytes);
indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0,
LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(UNINITIALIZED_COMPONENT_SEQ,
0, LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
deleteResourceFileMask(resourceFile);
} catch (Exception e) {
cleanup(resourceFile);
@@ -19,6 +19,7 @@
package org.apache.hyracks.storage.am.lsm.common.api;

import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;

public interface ILSMIOOperationCallback {

@@ -80,4 +81,8 @@ public interface ILSMIOOperationCallback {
* the allocated component
*/
void allocated(ILSMMemoryComponent component) throws HyracksDataException;

default long getLastValidSequence() throws HyracksDataException {
return AbstractLSMIndexFileManager.UNINITIALIZED_COMPONENT_SEQ;
}
}
@@ -78,4 +78,11 @@ LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String
* @throws IOException
*/
LSMComponentFileReferences getNewTransactionFileReference() throws IOException;

/**
* Initializes the last used sequence
*
* @param lastUsedSeq
*/
void initLastUsedSeq(long lastUsedSeq);
}
@@ -139,6 +139,7 @@ public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache> virtualB
this.inactiveMemoryComponents = new ArrayList<>();
this.durable = durable;
this.tracer = tracer;
fileManager.initLastUsedSeq(ioOpCallback.getLastValidSequence());
lsmHarness = new LSMHarness(this, ioScheduler, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(),
tracer);
isActive = false;
@@ -184,6 +185,7 @@ public AbstractLSMIndex(IIOManager ioManager, IBufferCache diskBufferCache, ILSM
filterManager = null;
treeFields = null;
filterFields = null;
fileManager.initLastUsedSeq(ioOpCallback.getLastValidSequence());
}

@Override
@@ -83,20 +83,19 @@ public enum TreeIndexState {
* Hides transaction components until they are either committed by removing this file or deleted along with the file
*/
public static final String TXN_PREFIX = ".T";

public static final long UNINITIALIZED_COMPONENT_SEQ = -1;
public static final FilenameFilter COMPONENT_FILES_FILTER = (dir, name) -> !name.startsWith(".");
protected static final FilenameFilter txnFileNameFilter = (dir, name) -> name.startsWith(TXN_PREFIX);
protected static FilenameFilter bloomFilterFilter =
(dir, name) -> !name.startsWith(".") && name.endsWith(BLOOM_FILTER_SUFFIX);
protected static final Comparator<String> cmp = new FileNameComparator();
private static final FilenameFilter dummyFilter = (dir, name) -> true;
private static final long UNINITALIZED_COMPONENT_SEQ = -1;
protected final IIOManager ioManager;
// baseDir should reflect dataset name and partition name and be absolute
protected final FileReference baseDir;
protected final Comparator<IndexComponentFileReference> recencyCmp = new RecencyComparator();
protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
private long lastUsedComponentSeq = UNINITALIZED_COMPONENT_SEQ;
private long lastUsedComponentSeq = UNINITIALIZED_COMPONENT_SEQ;
private final ICompressorDecompressorFactory compressorDecompressorFactory;

public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file,
@@ -348,6 +347,11 @@ public LSMComponentFileReferences getTransactionFileReferenceForCommit() throws
return null;
}

@Override
public void initLastUsedSeq(long lastUsedSeq) {
lastUsedComponentSeq = lastUsedSeq;
}

private static FilenameFilter createTransactionFilter(String transactionFileName, final boolean inclusive) {
final String timeStamp =
transactionFileName.substring(transactionFileName.indexOf(TXN_PREFIX) + TXN_PREFIX.length());
@@ -372,7 +376,7 @@ protected FilenameFilter getCompoundFilter(final FilenameFilter filter1, final F
}

protected String getNextComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException {
if (lastUsedComponentSeq == UNINITALIZED_COMPONENT_SEQ) {
if (lastUsedComponentSeq == UNINITIALIZED_COMPONENT_SEQ) {
lastUsedComponentSeq = getOnDiskLastUsedComponentSequence(filenameFilter);
}
return IndexComponentFileReference.getFlushSequence(++lastUsedComponentSeq);
@@ -100,6 +100,11 @@ public void allocated(ILSMMemoryComponent component) throws HyracksDataException
encapsulated.allocated(component);
}

@Override
public long getLastValidSequence() throws HyracksDataException {
return encapsulated.getLastValidSequence();
}

public ILSMIOOperationCallback getEncapsulated() {
return encapsulated;
}

0 comments on commit dc3704f

Please sign in to comment.