Skip to content

Commit

Permalink
HADOOP-18656. [ABFS] Add Support for Paginated Delete for Large Direc…
Browse files Browse the repository at this point in the history
…tories in HNS Account (#6409)


Contributed by Anuj Modi
  • Loading branch information
anujmodi2021 committed Apr 4, 2024
1 parent d7157b4 commit 6ed7389
Show file tree
Hide file tree
Showing 13 changed files with 456 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ public class AbfsConfiguration{
FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION)
private boolean isChecksumValidationEnabled;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
private boolean isPaginatedDeleteEnabled;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

Expand Down Expand Up @@ -1240,8 +1244,8 @@ public boolean getRenameResilience() {
return renameResilience;
}

void setRenameResilience(boolean actualResilience) {
renameResilience = actualResilience;
public boolean isPaginatedDeleteEnabled() {
return isPaginatedDeleteEnabled;
}

public boolean getIsChecksumValidationEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,8 +1077,8 @@ public void delete(final Path path, final boolean recursive,

do {
try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) {
AbfsRestOperation op = client
.deletePath(relativePath, recursive, continuation, tracingContext);
AbfsRestOperation op = client.deletePath(relativePath, recursive,
continuation, tracingContext, getIsNamespaceEnabled(tracingContext));
perfInfo.registerResult(op.getResult());
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
perfInfo.registerSuccess(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,37 @@ public final class AbfsHttpConstants {
public static final char CHAR_EQUALS = '=';
public static final char CHAR_STAR = '*';
public static final char CHAR_PLUS = '+';
public static final String DECEMBER_2019_API_VERSION = "2019-12-12";
public static final String APRIL_2021_API_VERSION = "2021-04-10";

/**
* Specifies the version of the REST protocol used for processing the request.
* Versions should be added in enum list in ascending chronological order.
* Latest one should be added last in the list.
* When upgrading the version for whole driver, update the getCurrentVersion;
*/
public enum ApiVersion {

DEC_12_2019("2019-12-12"),
APR_10_2021("2021-04-10"),
AUG_03_2023("2023-08-03");

private final String xMsApiVersion;

ApiVersion(String xMsApiVersion) {
this.xMsApiVersion = xMsApiVersion;
}

@Override
public String toString() {
return xMsApiVersion;
}

public static ApiVersion getCurrentVersion() {
return DEC_12_2019;
}
}

@Deprecated
public static final String DECEMBER_2019_API_VERSION = ApiVersion.DEC_12_2019.toString();

/**
* Value that differentiates categories of the http_status.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ public final class ConfigurationKeys {
/** Add extra resilience to rename failures, at the expense of performance. */
public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";

/**
* Specify whether paginated behavior is to be expected or not in delete path. {@value}
*/
public static final String FS_AZURE_ENABLE_PAGINATED_DELETE = "fs.azure.enable.paginated.delete";

/** Add extra layer of verification of the integrity of the request content during transport: {@value}. */
public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public final class FileSystemConfigurations {
public static final int STREAM_ID_LEN = 12;
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
public static final boolean DEFAULT_ENABLE_PAGINATED_DELETE = false;
public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public final class HttpQueryParams {
public static final String QUERY_PARAM_CLOSE = "close";
public static final String QUERY_PARAM_UPN = "upn";
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
public static final String QUERY_PARAM_PAGINATED = "paginated";

//query params for SAS
public static final String QUERY_PARAM_SAOID = "saoid";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class AbfsClient implements Closeable {

private final URL baseUrl;
private final SharedKeyCredentials sharedKeyCredentials;
private String xMsVersion = DECEMBER_2019_API_VERSION;
private ApiVersion xMsVersion = ApiVersion.getCurrentVersion();
private final ExponentialRetryPolicy exponentialRetryPolicy;
private final StaticRetryPolicy staticRetryPolicy;
private final String filesystem;
Expand All @@ -122,7 +122,6 @@ public class AbfsClient implements Closeable {
private final ListeningScheduledExecutorService executorService;
private Boolean isNamespaceEnabled;


private boolean renameResilience;

/**
Expand All @@ -149,7 +148,7 @@ private AbfsClient(final URL baseUrl,

if (encryptionContextProvider != null) {
this.encryptionContextProvider = encryptionContextProvider;
xMsVersion = APRIL_2021_API_VERSION; // will be default once server change deployed
xMsVersion = ApiVersion.APR_10_2021; // will be default once server change deployed
encryptionType = EncryptionType.ENCRYPTION_CONTEXT;
} else if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null) {
clientProvidedEncryptionKey =
Expand Down Expand Up @@ -259,13 +258,27 @@ AbfsThrottlingIntercept getIntercept() {
return intercept;
}

List<AbfsHttpHeader> createDefaultHeaders() {
/**
* Create request headers for Rest Operation using the current API version.
* @return default request headers
*/
@VisibleForTesting
protected List<AbfsHttpHeader> createDefaultHeaders() {
return createDefaultHeaders(this.xMsVersion);
}

/**
* Create request headers for Rest Operation using the specified API version.
* @param xMsVersion
* @return default request headers
*/
private List<AbfsHttpHeader> createDefaultHeaders(ApiVersion xMsVersion) {
final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion));
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion.toString()));
requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON
+ COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM));
+ COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM));
requestHeaders.add(new AbfsHttpHeader(ACCEPT_CHARSET,
UTF_8));
UTF_8));
requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, EMPTY_STRING));
requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgent));
return requestHeaders;
Expand Down Expand Up @@ -1117,12 +1130,29 @@ public AbfsRestOperation read(final String path,
return op;
}

public AbfsRestOperation deletePath(final String path, final boolean recursive, final String continuation,
TracingContext tracingContext)
public AbfsRestOperation deletePath(final String path, final boolean recursive,
final String continuation,
TracingContext tracingContext,
final boolean isNamespaceEnabled)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();

/*
* If Pagination is enabled and current API version is old,
* use the minimum required version for pagination.
* If Pagination is enabled and current API version is later than minimum required
* version for pagination, use current version only as azure service is backward compatible.
* If pagination is disabled, use the current API version only.
*/
final List<AbfsHttpHeader> requestHeaders = (isPaginatedDelete(recursive,
isNamespaceEnabled) && xMsVersion.compareTo(ApiVersion.AUG_03_2023) < 0)
? createDefaultHeaders(ApiVersion.AUG_03_2023)
: createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();

if (isPaginatedDelete(recursive, isNamespaceEnabled)) {
// Add paginated query parameter
abfsUriQueryBuilder.addQuery(QUERY_PARAM_PAGINATED, TRUE);
}

abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
String operation = recursive ? SASTokenProvider.DELETE_RECURSIVE_OPERATION : SASTokenProvider.DELETE_OPERATION;
Expand Down Expand Up @@ -1465,6 +1495,14 @@ private synchronized Boolean getIsNamespaceEnabled(TracingContext tracingContext
return isNamespaceEnabled;
}

protected Boolean getIsPaginatedDeleteEnabled() {
return abfsConfiguration.isPaginatedDeleteEnabled();
}

private Boolean isPaginatedDelete(boolean isRecursiveDelete, boolean isNamespaceEnabled) {
return getIsPaginatedDeleteEnabled() && isNamespaceEnabled && isRecursiveDelete;
}

public AuthType getAuthType() {
return authType;
}
Expand Down Expand Up @@ -1659,7 +1697,7 @@ protected AbfsCounters getAbfsCounters() {
return abfsCounters;
}

public String getxMsVersion() {
public ApiVersion getxMsVersion() {
return xMsVersion;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.Callable;

import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -215,6 +216,7 @@ public void setup() throws Exception {
wasb = new NativeAzureFileSystem(azureNativeFileSystemStore);
wasb.initialize(wasbUri, rawConfig);
}
// Todo: To be fixed in HADOOP-19137
AbfsClientUtils.setIsNamespaceEnabled(abfs.getAbfsClient(), true);
}

Expand Down Expand Up @@ -532,4 +534,10 @@ protected long assertAbfsStatistics(AbfsStatistic statistic,
(long) metricMap.get(statistic.getStatName()));
return expectedValue;
}

protected void assumeValidTestConfigPresent(final Configuration conf, final String key) {
String configuredValue = conf.get(key);
Assume.assumeTrue(String.format("Missing Required Test Config: %s.", key),
configuredValue != null && !configuredValue.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,9 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
return client.renamePath(path, new Path(path + "_2").toString(),
null, tc, null, false, fs.getIsNamespaceEnabled(tc)).getOp();
case DELETE:
TracingContext testTC = getTestTracingContext(fs, false);
return client.deletePath(path, false, null,
getTestTracingContext(fs, false));
testTC, fs.getIsNamespaceEnabled(testTC));
case GET_ATTR:
return client.getPathStatus(path, true,
getTestTracingContext(fs, false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
"/NonExistingPath",
false,
null,
getTestTracingContext(fs, true)));
getTestTracingContext(fs, true),
fs.getIsNamespaceEnabled(getTestTracingContext(fs, true))));

// mock idempotency check to mimic retried case
AbfsClient mockClient = ITestAbfsClient.getMockAbfsClient(
Expand All @@ -269,14 +270,15 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
TracingContext tracingContext = getTestTracingContext(fs, false);
doReturn(tracingContext).when(idempotencyRetOp).createNewTracingContext(any());
when(mockClient.deletePath("/NonExistingPath", false, null, tracingContext))
when(mockClient.deletePath("/NonExistingPath", false, null,
tracingContext, fs.getIsNamespaceEnabled(tracingContext)))
.thenCallRealMethod();

Assertions.assertThat(mockClient.deletePath(
"/NonExistingPath",
false,
null,
tracingContext)
tracingContext, fs.getIsNamespaceEnabled(tracingContext))
.getResult()
.getStatusCode())
.describedAs("Idempotency check reports successful "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.util.List;

import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;

public final class AbfsClientUtils {
Expand All @@ -31,4 +33,13 @@ public static void setIsNamespaceEnabled(final AbfsClient abfsClient, final Bool
public static void setEncryptionContextProvider(final AbfsClient abfsClient, final EncryptionContextProvider provider) {
abfsClient.setEncryptionContextProvider(provider);
}

public static String getHeaderValue(List<AbfsHttpHeader> reqHeaders, String headerName) {
for (AbfsHttpHeader header : reqHeaders) {
if (header.getName().equals(headerName)) {
return header.getValue();
}
}
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
return client;
}

private static AbfsClient setAbfsClientField(
static AbfsClient setAbfsClientField(
final AbfsClient client,
final String fieldName,
Object fieldObject) throws Exception {
Expand Down

0 comments on commit 6ed7389

Please sign in to comment.