Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18425. ABFS rename resilience through etags #5485

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ public class AbfsConfiguration{
FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR)
private boolean enableAbfsListIterator;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE)
private boolean renameResilience;

public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
Expand Down Expand Up @@ -1130,4 +1134,7 @@ public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
this.enableAbfsListIterator = enableAbfsListIterator;
}

public boolean getRenameResilience() {
return renameResilience;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import javax.annotation.Nullable;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.EtagSource;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -154,6 +157,11 @@ public class AzureBlobFileSystem extends FileSystem
/** Rate limiting for operations which use it to throttle their IO. */
private RateLimiting rateLimiting;

/**
* Enable resilient rename.
*/
private boolean renameResilience;

@Override
public void initialize(URI uri, Configuration configuration)
throws IOException {
Expand Down Expand Up @@ -226,6 +234,8 @@ public void initialize(URI uri, Configuration configuration)
}

rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit());

renameResilience = abfsConfiguration.getRenameResilience();
LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
}

Expand Down Expand Up @@ -442,10 +452,13 @@ public boolean rename(final Path src, final Path dst) throws IOException {
}

// Non-HNS account need to check dst status on driver side.
if (!abfsStore.getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) {
final boolean isNamespaceEnabled = abfsStore.getIsNamespaceEnabled(tracingContext);
if (!isNamespaceEnabled && dstFileStatus == null) {
dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext);
}

FileStatus sourceFileStatus = null;

try {
String sourceFileName = src.getName();
Path adjustedDst = dst;
Expand All @@ -459,10 +472,24 @@ public boolean rename(final Path src, final Path dst) throws IOException {

qualifiedDstPath = makeQualified(adjustedDst);

abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null);
String etag = null;
if (renameResilience && isNamespaceEnabled) {
// for resilient rename on an HNS store, get the etag before
// attempting the rename, and pass it down
sourceFileStatus = abfsStore.getFileStatus(qualifiedSrcPath, tracingContext);
etag = ((EtagSource) sourceFileStatus).getEtag();
}
boolean recovered = abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext,
etag);
if (recovered) {
LOG.info("Recovered from rename failure of {} to {}",
qualifiedSrcPath, qualifiedDstPath);
}
return true;
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Rename operation failed. ", ex);
LOG.debug("Rename {} to {} failed. source {} dest {}",
qualifiedSrcPath, qualifiedDstPath,
sourceFileStatus, dstFileStatus, ex);
checkException(
src,
ex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ public final class ConfigurationKeys {
/** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */
public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit";

/** Add extra resilience to rename failures, at the expense of performance. */
public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";

public static String accountProperty(String property, String account) {
return property + "." + account;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public final class FileSystemConfigurations {

public static final int STREAM_ID_LEN = 12;
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;

/**
* Limit of queued block upload operations before writes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ public AbfsClientRenameResult renamePath(
if (!op.hasResult()) {
throw e;
}
LOG.debug("Rename of {} to {} failed, attempting recovery", source, destination, e);

// ref: HADOOP-18242. Rename failure occurring due to a rare case of
// tracking metadata being in incomplete state.
Expand All @@ -574,18 +575,15 @@ public AbfsClientRenameResult renamePath(
// then we can retry the rename operation.
AbfsRestOperation sourceStatusOp = getPathStatus(source, false,
tracingContext);
isMetadataIncompleteState = true;
// Extract the sourceEtag, using the status Op, and set it
// for future rename recovery.
AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult();
String sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
renamePath(source, destination, continuation, tracingContext,
sourceEtagAfterFailure, isMetadataIncompleteState);
sourceEtagAfterFailure, true);
}
// if we get out of the condition without a successful rename, then
// it isn't metadata incomplete state issue.
isMetadataIncompleteState = false;

boolean etagCheckSucceeded = renameIdempotencyCheckOp(
source,
sourceEtag, op, destination, tracingContext);
Expand All @@ -594,7 +592,7 @@ public AbfsClientRenameResult renamePath(
// throw back the exception
throw e;
}
return new AbfsClientRenameResult(op, true, isMetadataIncompleteState);
return new AbfsClientRenameResult(op, true, false);
}
}

Expand Down