Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
5a2ef89
HADOOP-16948. Support single writer dirs.
billierinaldi Mar 25, 2020
86246dc
HADOOP-16948. Fix findbugs and checkstyle problems.
billierinaldi Oct 28, 2020
fe3dd13
HADOOP-16948. Fix remaining checkstyle problems.
billierinaldi Oct 28, 2020
1cd1a92
HADOOP-16948. Add DurationInfo, retry policy for acquiring lease, and…
billierinaldi Jan 4, 2021
eeb4ba0
HADOOP-16948. Convert ABFS client to use an executor for lease ops
billierinaldi Jan 12, 2021
8e91f61
HADOOP-16948. Fix ABFS lease test for non-HNS
billierinaldi Jan 14, 2021
8eceabb
HADOOP-16948. Fix checkstyle and javadoc
billierinaldi Jan 14, 2021
42f68c2
HADOOP-16948. Address review comments
billierinaldi Jan 17, 2021
9f3d689
HADOOP-16948. Use daemon threads for ABFS lease ops
billierinaldi Jan 29, 2021
d67882f
HADOOP-16948. Make lease duration configurable
billierinaldi Mar 2, 2021
f00c145
HADOOP-16948. Add error messages to test assertions
billierinaldi Mar 2, 2021
f001783
HADOOP-16948. Remove extra isSingleWriterKey call
billierinaldi Mar 3, 2021
92e7343
HADOOP-16948. Use only infinite lease duration due to cost of renewal…
billierinaldi Mar 15, 2021
eca41b4
HADOOP-16948. Remove acquire/renew/release lease methods
billierinaldi Mar 16, 2021
822615e
HADOOP-16948. Rename single writer dirs to infinite lease dirs
billierinaldi Mar 16, 2021
9fc4f08
HADOOP-16948. Fix checkstyle
billierinaldi Mar 17, 2021
b6803cf
HADOOP-16948. Wait for acquire lease future
billierinaldi Mar 22, 2021
4fdfc08
HADOOP-16948. Add unit test for acquire lease failure
billierinaldi Mar 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerWithOutlierConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
Expand Down Expand Up @@ -208,6 +209,15 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
private String azureAppendBlobDirs;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INFINITE_LEASE_KEY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the feature for both namespace and flatnamespace enabled accounts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have run the unit test with HNS and flat namespace storage accounts, so I think it will work. I have not done extensive testing with HNS disabled, however.

DefaultValue = DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES)
private String azureInfiniteLeaseDirs;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_LEASE_THREADS,
Copy link
Contributor

@snehavarma snehavarma Mar 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these? i.e. Lease threads

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will still be useful to issue the acquire and release operations in a thread pool for now. Possibly this could be removed if all acquire and release operations are moved into create and flush-with-close in the future.

MinValue = MIN_LEASE_THREADS,
DefaultValue = DEFAULT_LEASE_THREADS)
private int numLeaseThreads;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
private boolean createRemoteFileSystemDuringInitialization;
Expand Down Expand Up @@ -296,6 +306,8 @@ public AbfsConfiguration(final Configuration rawConfig, String accountName)
field.setAccessible(true);
if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) {
field.set(this, validateInt(field));
} else if (field.isAnnotationPresent(IntegerWithOutlierConfigurationValidatorAnnotation.class)) {
field.set(this, validateIntWithOutlier(field));
} else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) {
field.set(this, validateLong(field));
} else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) {
Expand Down Expand Up @@ -634,6 +646,14 @@ public String getAppendBlobDirs() {
return this.azureAppendBlobDirs;
}

public String getAzureInfiniteLeaseDirs() {
return this.azureInfiniteLeaseDirs;
}

public int getNumLeaseThreads() {
return this.numLeaseThreads;
}

public boolean getCreateRemoteFileSystemDuringInitialization() {
// we do not support creating the filesystem when AuthType is SAS
return this.createRemoteFileSystemDuringInitialization
Expand Down Expand Up @@ -843,6 +863,21 @@ int validateInt(Field field) throws IllegalAccessException, InvalidConfiguration
validator.ThrowIfInvalid()).validate(value);
}

int validateIntWithOutlier(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
IntegerWithOutlierConfigurationValidatorAnnotation validator =
field.getAnnotation(IntegerWithOutlierConfigurationValidatorAnnotation.class);
String value = get(validator.ConfigurationKey());

// validate
return new IntegerConfigurationBasicValidator(
validator.OutlierValue(),
validator.MinValue(),
validator.MaxValue(),
validator.DefaultValue(),
validator.ConfigurationKey(),
validator.ThrowIfInvalid()).validate(value);
}

long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
String value = rawConfig.get(validator.ConfigurationKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;

Expand Down Expand Up @@ -505,6 +506,26 @@ public FileStatus getFileStatus(final Path f) throws IOException {
}
}

/**
* Break the current lease on an ABFS file if it exists. A lease that is broken cannot be
* renewed. A new lease may be obtained on the file immediately.
*
* @param f file name
* @throws IOException on any exception while breaking the lease
*/
public void breakLease(final Path f) throws IOException {
LOG.debug("AzureBlobFileSystem.breakLease path: {}", f);

Path qualifiedPath = makeQualified(f);

try (DurationInfo ignored = new DurationInfo(LOG, false, "Break lease for %s",
qualifiedPath)) {
abfsStore.breakLease(qualifiedPath);
} catch(AzureBlobFileSystemException ex) {
checkException(f, ex);
}
}

/**
* Qualify a path to one which uses this FileSystem and, if relative,
* made absolute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -48,10 +49,14 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;

import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,6 +105,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
Expand Down Expand Up @@ -145,8 +151,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
private static final int GET_SET_AGGREGATE_COUNT = 2;

private final Map<AbfsLease, Object> leaseRefs;

private final AbfsConfiguration abfsConfiguration;
private final Set<String> azureAtomicRenameDirSet;
private Set<String> azureInfiniteLeaseDirSet;
private Trilean isNamespaceEnabled;
private final AuthType authType;
private final UserGroupInformation userGroupInformation;
Expand All @@ -167,6 +176,8 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
final String fileSystemName = authorityParts[0];
final String accountName = authorityParts[1];

leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());

try {
this.abfsConfiguration = new AbfsConfiguration(configuration, accountName);
} catch (IllegalAccessException exception) {
Expand Down Expand Up @@ -195,6 +206,7 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,

this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
updateInfiniteLeaseDirs();
this.authType = abfsConfiguration.getAuthType(accountName);
boolean usingOauth = (authType == AuthType.OAuth);
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
Expand Down Expand Up @@ -246,7 +258,24 @@ public String getPrimaryGroup() {

@Override
public void close() throws IOException {
IOUtils.cleanupWithLogger(LOG, client);
List<ListenableFuture<?>> futures = new ArrayList<>();
for (AbfsLease lease : leaseRefs.keySet()) {
if (lease == null) {
continue;
}
ListenableFuture<?> future = client.submit(() -> lease.free());
futures.add(future);
}
try {
Futures.allAsList(futures).get();
} catch (InterruptedException e) {
LOG.error("Interrupted freeing leases", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.error("Error freeing leases", e);
} finally {
IOUtils.cleanupWithLogger(LOG, client);
}
}

byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
Expand Down Expand Up @@ -496,12 +525,14 @@ public OutputStream createFile(final Path path,
}
perfInfo.registerResult(op.getResult()).registerSuccess(true);

AbfsLease lease = maybeCreateLease(relativePath);

return new AbfsOutputStream(
client,
statistics,
relativePath,
0,
populateAbfsOutputStreamContext(isAppendBlob));
populateAbfsOutputStreamContext(isAppendBlob, lease));
}
}

Expand Down Expand Up @@ -573,7 +604,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
return op;
}

private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob,
AbfsLease lease) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
Expand All @@ -587,6 +619,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppend
.withAppendBlob(isAppendBlob)
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withLease(lease)
.build();
}

Expand Down Expand Up @@ -705,15 +738,29 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic
isAppendBlob = true;
}

AbfsLease lease = maybeCreateLease(relativePath);

return new AbfsOutputStream(
client,
statistics,
relativePath,
offset,
populateAbfsOutputStreamContext(isAppendBlob));
populateAbfsOutputStreamContext(isAppendBlob, lease));
}
}

/**
* Break any current lease on an ABFS file.
*
* @param path file name
* @throws AzureBlobFileSystemException on any exception while breaking the lease
*/
public void breakLease(final Path path) throws AzureBlobFileSystemException {
LOG.debug("lease path: {}", path);

client.breakLease(getRelativePath(path));
}

public void rename(final Path source, final Path destination) throws
AzureBlobFileSystemException {
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
Expand Down Expand Up @@ -1347,6 +1394,13 @@ public boolean isAtomicRenameKey(String key) {
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
}

public boolean isInfiniteLeaseKey(String key) {
if (azureInfiniteLeaseDirSet.isEmpty()) {
return false;
}
return isKeyForDirectorySet(key, azureInfiniteLeaseDirSet);
}

/**
* A on-off operation to initialize AbfsClient for AzureBlobFileSystem
* Operations.
Expand Down Expand Up @@ -1636,4 +1690,32 @@ void setNamespaceEnabled(Trilean isNamespaceEnabled){
this.isNamespaceEnabled = isNamespaceEnabled;
}

private void updateInfiniteLeaseDirs() {
this.azureInfiniteLeaseDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureInfiniteLeaseDirs().split(AbfsHttpConstants.COMMA)));
// remove the empty string, since isKeyForDirectory returns true for empty strings
// and we don't want to default to enabling infinite lease dirs
this.azureInfiniteLeaseDirSet.remove("");
}

private AbfsLease maybeCreateLease(String relativePath)
throws AzureBlobFileSystemException {
boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath);
if (!enableInfiniteLease) {
return null;
}
AbfsLease lease = new AbfsLease(client, relativePath);
leaseRefs.put(lease, null);
return lease;
}

@VisibleForTesting
boolean areLeasesFreed() {
for (AbfsLease lease : leaseRefs.keySet()) {
if (lease != null && !lease.isFreed()) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public final class AbfsHttpConstants {
public static final String GET_ACCESS_CONTROL = "getAccessControl";
public static final String CHECK_ACCESS = "checkAccess";
public static final String GET_STATUS = "getStatus";
public static final String ACQUIRE_LEASE_ACTION = "acquire";
public static final String BREAK_LEASE_ACTION = "break";
public static final String RELEASE_LEASE_ACTION = "release";
public static final String RENEW_LEASE_ACTION = "renew";
public static final String DEFAULT_LEASE_BREAK_PERIOD = "0";
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String TOKEN_VERSION = "2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ public final class ConfigurationKeys {
/** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
* Default is empty. **/
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
/** Provides a config to provide comma separated path prefixes which support infinite leases.
* Files under these paths will be leased when created or opened for writing and the lease will
* be released when the file is closed. The lease may be broken with the breakLease method on
* AzureBlobFileSystem. Default is empty.
* **/
public static final String FS_AZURE_INFINITE_LEASE_KEY = "fs.azure.infinite-lease.directories";
/** Provides a number of threads to use for lease operations for infinite lease directories.
* Must be set to a minimum of 1 if infinite lease directories are to be used. Default is 0. **/
public static final String FS_AZURE_LEASE_THREADS = "fs.azure.lease.threads";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true;
public static final boolean DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE = true;
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
public static final String DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES = "";
public static final int DEFAULT_LEASE_THREADS = 0;
public static final int MIN_LEASE_THREADS = 0;
public static final int DEFAULT_LEASE_DURATION = -1;
public static final int INFINITE_LEASE_DURATION = -1;
public static final int MIN_LEASE_DURATION = 15;
public static final int MAX_LEASE_DURATION = 60;

public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ public class ConfigurationValidationAnnotations {
boolean ThrowIfInvalid() default false;
}

@Target({ ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
public @interface IntegerWithOutlierConfigurationValidatorAnnotation {
String ConfigurationKey();

int MaxValue() default Integer.MAX_VALUE;

int MinValue() default Integer.MIN_VALUE;

int OutlierValue() default Integer.MIN_VALUE;

int DefaultValue();

boolean ThrowIfInvalid() default false;
}

/**
* Describes the requirements when validating the annotated long field.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public AzureBlobFileSystemException(final String message, final Exception innerE
super(message, innerException);
}

public AzureBlobFileSystemException(final String message, final Throwable innerThrowable) {
super(message, innerThrowable);
}

@Override
public String toString() {
if (this.getMessage() == null && this.getCause() == null) {
Expand Down
Loading