Skip to content

Commit

Permalink
HADOOP-18425. ABFS rename resilience through etags
Browse files Browse the repository at this point in the history
If "fs.azure.enable.rename.resilience" is true, then
do a HEAD of the source file before the rename, which
can then be used to recover from the failure, as
the manifest committer does (HADOOP-18163).

Change-Id: Ia417f1501f7274662eb9ff919c6378fb913b476b

HADOOP-18425. ABFS rename resilience through etags

only get the etag on HNS stores

Change-Id: I9faffa78294e1782f0b2db3d1c997ec3fe53637c
  • Loading branch information
steveloughran committed Mar 19, 2023
1 parent 759ddeb commit da1a2dc
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ public class AbfsConfiguration{
FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR)
private boolean enableAbfsListIterator;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE)
private boolean renameResilience;

public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
Expand Down Expand Up @@ -1130,4 +1134,7 @@ public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
this.enableAbfsListIterator = enableAbfsListIterator;
}

public boolean getRenameResilience() {
return renameResilience;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import javax.annotation.Nullable;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.EtagSource;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -154,6 +157,11 @@ public class AzureBlobFileSystem extends FileSystem
/** Rate limiting for operations which use it to throttle their IO. */
private RateLimiting rateLimiting;

/**
* Enable resilient rename.
*/
private boolean renameResilience;

@Override
public void initialize(URI uri, Configuration configuration)
throws IOException {
Expand Down Expand Up @@ -226,6 +234,8 @@ public void initialize(URI uri, Configuration configuration)
}

rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit());

renameResilience = abfsConfiguration.getRenameResilience();
LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
}

Expand Down Expand Up @@ -442,10 +452,13 @@ public boolean rename(final Path src, final Path dst) throws IOException {
}

// Non-HNS account need to check dst status on driver side.
if (!abfsStore.getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) {
final boolean isNamespaceEnabled = abfsStore.getIsNamespaceEnabled(tracingContext);
if (!isNamespaceEnabled && dstFileStatus == null) {
dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext);
}

FileStatus sourceFileStatus = null;

try {
String sourceFileName = src.getName();
Path adjustedDst = dst;
Expand All @@ -459,10 +472,24 @@ public boolean rename(final Path src, final Path dst) throws IOException {

qualifiedDstPath = makeQualified(adjustedDst);

abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null);
String etag = null;
if (renameResilience && isNamespaceEnabled) {
// for resilient rename on an HNS store, get the etag before
// attempting the rename, and pass it down
sourceFileStatus = abfsStore.getFileStatus(qualifiedSrcPath, tracingContext);
etag = ((EtagSource) sourceFileStatus).getEtag();
}
boolean recovered = abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext,
etag);
if (recovered) {
LOG.info("Recovered from rename failure of {} to {}",
qualifiedSrcPath, qualifiedDstPath);
}
return true;
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Rename operation failed. ", ex);
LOG.debug("Rename {} to {} failed. source {} dest {}",
qualifiedSrcPath, qualifiedDstPath,
sourceFileStatus, dstFileStatus, ex);
checkException(
src,
ex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ public final class ConfigurationKeys {
/** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */
public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit";

/** Add extra resilience to rename failures, at the expense of performance. */
public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";

public static String accountProperty(String property, String account) {
return property + "." + account;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public final class FileSystemConfigurations {

public static final int STREAM_ID_LEN = 12;
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;

/**
* Limit of queued block upload operations before writes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ public AbfsClientRenameResult renamePath(
if (!op.hasResult()) {
throw e;
}
LOG.debug("Rename of {} to {} failed, attempting recovery", source, destination, e);

// ref: HADOOP-18242. Rename failure occurring due to a rare case of
// tracking metadata being in incomplete state.
Expand All @@ -574,18 +575,15 @@ public AbfsClientRenameResult renamePath(
// then we can retry the rename operation.
AbfsRestOperation sourceStatusOp = getPathStatus(source, false,
tracingContext);
isMetadataIncompleteState = true;
// Extract the sourceEtag, using the status Op, and set it
// for future rename recovery.
AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult();
String sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
renamePath(source, destination, continuation, tracingContext,
sourceEtagAfterFailure, isMetadataIncompleteState);
sourceEtagAfterFailure, true);
}
// if we get out of the condition without a successful rename, then
// it isn't metadata incomplete state issue.
isMetadataIncompleteState = false;

boolean etagCheckSucceeded = renameIdempotencyCheckOp(
source,
sourceEtag, op, destination, tracingContext);
Expand All @@ -594,7 +592,7 @@ public AbfsClientRenameResult renamePath(
// throw back the exception
throw e;
}
return new AbfsClientRenameResult(op, true, isMetadataIncompleteState);
return new AbfsClientRenameResult(op, true, false);
}
}

Expand Down

0 comments on commit da1a2dc

Please sign in to comment.