Skip to content

Commit

Permalink
Fix setTtl workflow and ttlbucket bugs
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

1)setttl cmd on a big directory unnecessary setattr recursively, while
dir ttl already supercedes children ttl. there's absolutely no reason to
save ttl on children node individually.
2)ttlbucket unnecessarily saves Inode obj on heap unboundedly, where it
could and should only save inodeid and load inode from inodestore to
double check expiry during inodettlchecker processing.
3)fix race condition on ttlbucketlists from ttlchecker and insertion
from foreground.
4)fix case where any expiration of inodes failed during ttlchecker it
will never be retried again.

### Why are the changes needed?

Reduce heavy GC when setttl on a huge folder / bug fixes as indicated
above for ttl

### Does this PR introduce any user facing changes?

yes. So now setTtl command on a dir will not set ttl attributes for all
its children.

pr-link: #16933
change-id: cid-36ec524b2178220886cccb7a82de119bdcc8203f
  • Loading branch information
lucyge2022 committed Mar 13, 2023
1 parent cb8d7ca commit 7313853
Show file tree
Hide file tree
Showing 8 changed files with 332 additions and 117 deletions.
Expand Up @@ -30,10 +30,13 @@
import alluxio.master.journal.JournalContext;
import alluxio.master.journal.NoopJournalContext;
import alluxio.proto.journal.File.UpdateInodeEntry;
import alluxio.util.ThreadUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;

Expand All @@ -59,28 +62,42 @@ public InodeTtlChecker(FileSystemMaster fileSystemMaster, InodeTree inodeTree) {

@Override
public void heartbeat() throws InterruptedException {
Set<TtlBucket> expiredBuckets = mTtlBuckets.getExpiredBuckets(System.currentTimeMillis());
Set<TtlBucket> expiredBuckets = mTtlBuckets.pollExpiredBuckets(System.currentTimeMillis());
Map<Inode, Integer> failedInodesToRetryNum = new HashMap<>();
for (TtlBucket bucket : expiredBuckets) {
for (Inode inode : bucket.getInodes()) {
for (Map.Entry<Long, Integer> inodeExpiryEntry : bucket.getInodeExpiries()) {
// Throw if interrupted.
if (Thread.interrupted()) {
throw new InterruptedException("InodeTtlChecker interrupted.");
}
long inodeId = inodeExpiryEntry.getKey();
int leftRetries = inodeExpiryEntry.getValue();
// Exhausted retry attempt to expire this inode, bail.
if (leftRetries <= 0) {
continue;
}
AlluxioURI path = null;
try (LockedInodePath inodePath =
mInodeTree.lockFullInodePath(
inode.getId(), LockPattern.READ, NoopJournalContext.INSTANCE)
inodeId, LockPattern.READ, NoopJournalContext.INSTANCE)
) {
path = inodePath.getUri();
} catch (FileDoesNotExistException e) {
// The inode has already been deleted, nothing needs to be done.
continue;
} catch (Exception e) {
LOG.error("Exception trying to clean up {} for ttl check: {}", inode.toString(),
e.toString());
LOG.error("Exception trying to clean up inode:{},path:{} for ttl check: {}", inodeId,
path, e.toString());
}
if (path != null) {
Inode inode = null;
try {
inode = mTtlBuckets.loadInode(inodeId);
// Check again if this inode is indeed expired.
if (inode == null || inode.getTtl() == Constants.NO_TTL
|| inode.getCreationTimeMs() + inode.getTtl() > System.currentTimeMillis()) {
continue;
}
TtlAction ttlAction = inode.getTtlAction();
LOG.info("Path {} TTL has expired, performing action {}", path.getPath(), ttlAction);
switch (ttlAction) {
Expand All @@ -102,7 +119,6 @@ public void heartbeat() throws InterruptedException {
.setTtlAction(ProtobufUtils.toProtobuf(TtlAction.DELETE))
.build());
}
mTtlBuckets.remove(inode);
break;
case DELETE:
// public delete method will lock the path, and check WRITE permission required at
Expand Down Expand Up @@ -131,12 +147,23 @@ public void heartbeat() throws InterruptedException {
LOG.error("Unknown ttl action {}", ttlAction);
}
} catch (Exception e) {
LOG.error("Exception trying to clean up {} for ttl check", inode, e);
boolean retryExhausted = --leftRetries <= 0;
if (retryExhausted) {
LOG.error("Retry exhausted to clean up {} for ttl check. {}",
path, ThreadUtils.formatStackTrace(e));
} else if (inode != null) {
failedInodesToRetryNum.put(inode, leftRetries);
}
}
}
}
}
mTtlBuckets.removeBuckets(expiredBuckets);
// Put back those failed-to-expire inodes for next round retry.
if (!failedInodesToRetryNum.isEmpty()) {
for (Map.Entry<Inode, Integer> failedInodeEntry : failedInodesToRetryNum.entrySet()) {
mTtlBuckets.insert(failedInodeEntry.getKey(), failedInodeEntry.getValue());
}
}
}

@Override
Expand Down
Expand Up @@ -17,6 +17,8 @@
import com.google.common.base.Objects;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -34,16 +36,18 @@ public final class TtlBucket implements Comparable<TtlBucket> {
*/
private static long sTtlIntervalMs =
Configuration.getMs(PropertyKey.MASTER_TTL_CHECKER_INTERVAL_MS);
public static final int DEFAULT_RETRY_ATTEMPTS = 5;
/**
* Each bucket has a time to live interval, this value is the start of the interval, interval
* value is the same as the configuration of {@link PropertyKey#MASTER_TTL_CHECKER_INTERVAL_MS}.
*/
private final long mTtlIntervalStartTimeMs;
/**
* A collection of inodes whose ttl value is in the range of this bucket's interval. The mapping
* is from inode id to inode.
* A collection containing those inodes whose ttl value is
* in the range of this bucket's interval. The mapping
* is from inode id to the number of left retry to process.
*/
private final ConcurrentHashMap<Long, Inode> mInodes;
private final ConcurrentHashMap<Long, Integer> mInodeToRetryMap;

/**
* Creates a new instance of {@link TtlBucket}.
Expand All @@ -52,7 +56,7 @@ public final class TtlBucket implements Comparable<TtlBucket> {
*/
public TtlBucket(long startTimeMs) {
mTtlIntervalStartTimeMs = startTimeMs;
mInodes = new ConcurrentHashMap<>();
mInodeToRetryMap = new ConcurrentHashMap<>();
}

/**
Expand All @@ -78,38 +82,57 @@ public static long getTtlIntervalMs() {
}

/**
* @return the set of all inodes in the bucket backed by the internal set, changes made to the
* returned set will be shown in the internal set, and vice versa
* @return an unmodifiable view of all inodes ids in the bucket
*/
public Collection<Inode> getInodes() {
return mInodes.values();
public Collection<Long> getInodeIds() {
return Collections.unmodifiableSet(mInodeToRetryMap.keySet());
}

/**
* Adds a inode to the bucket.
* Get collection of inode to its left ttl process retry attempts.
* @return collection of inode to its left ttl process retry attempts
*/
public Collection<Map.Entry<Long, Integer>> getInodeExpiries() {
return Collections.unmodifiableSet(mInodeToRetryMap.entrySet());
}

/**
* Adds an inode with default num of retry attempt to expire.
* @param inode
*/
public void addInode(Inode inode) {
addInode(inode, DEFAULT_RETRY_ATTEMPTS);
}

/**
* Adds an inode to the bucket with a specific left retry number.
*
* @param inode the inode to be added
* @return true if a new inode was added to the bucket
* @param numOfRetry num of retries left when added to the ttlbucket
*/
public boolean addInode(Inode inode) {
return mInodes.put(inode.getId(), inode) == null;
public void addInode(Inode inode, int numOfRetry) {
mInodeToRetryMap.compute(inode.getId(), (k, v) -> {
if (v != null) {
return Math.min(v, numOfRetry);
}
return numOfRetry;
});
}

/**
* Removes a inode from the bucket.
* Removes an inode from the bucket.
*
* @param inode the inode to be removed
* @return true if a inode was removed
*/
public boolean removeInode(InodeView inode) {
return mInodes.remove(inode.getId()) != null;
public void removeInode(InodeView inode) {
mInodeToRetryMap.remove(inode.getId());
}

/**
* @return the number of inodes in the bucket
*/
public int size() {
return mInodes.size();
return mInodeToRetryMap.size();
}

/**
Expand Down

0 comments on commit 7313853

Please sign in to comment.