Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
private String azureAppendBlobDirs;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_ENFORCE_LEASE,
DefaultValue = DEFAULT_FS_AZURE_WRITE_ENFORCE_LEASE)
private boolean azureWriteEnforceLease;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_LEASE_DURATION,
MinValue = MIN_LEASE_DURATION,
MaxValue = MAX_LEASE_DURATION,
DefaultValue = DEFAULT_FS_AZURE_WRITE_LEASE_DURATION)
private int azureWriteLeaseDuration;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INFINITE_LEASE_KEY,
DefaultValue = DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES)
private String azureInfiniteLeaseDirs;
Expand Down Expand Up @@ -646,6 +656,14 @@ public String getAppendBlobDirs() {
return this.azureAppendBlobDirs;
}

public boolean isLeaseEnforced() {
return this.azureWriteEnforceLease;
}

public int getWriteLeaseDuration() {
return this.azureWriteLeaseDuration;
}

public String getAzureInfiniteLeaseDirs() {
return this.azureInfiniteLeaseDirs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,13 +506,15 @@ public OutputStream createFile(final Path path,
triggerConditionalCreateOverwrite = true;
}

AbfsLease lease = maybeCreateFiniteLease(relativePath, isNamespaceEnabled);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the path qualifies for infinteLease as per config settings, this will still create a finite lease ?

Call to a method createLease() that will first check for infiniteLease setting before defaulting to finite lease (if write lease config is enabled), would be the expectation ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it will not, it returns a null object if path qualifies for infinite lease

AbfsRestOperation op;
if (triggerConditionalCreateOverwrite) {
op = conditionalCreateOverwriteFile(relativePath,
statistics,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null,
isAppendBlob
isAppendBlob,
lease
);

} else {
Expand All @@ -521,12 +523,14 @@ public OutputStream createFile(final Path path,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null,
isAppendBlob,
null);
null,
lease);
}
perfInfo.registerResult(op.getResult()).registerSuccess(true);

AbfsLease lease = maybeCreateLease(relativePath);

if (lease == null) {
Copy link
Contributor

@snvijaya snvijaya Jun 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acquiring lease along with create/append enables atomic fetch of lease with the store operation and prevent any parallel writers. If configured for infinite lease, the path creation will happen without bundled lease acquiry and rely on later acquireLease API call in AbfsLease. Infinite lease flow should purely be on acquireLease API only if fs.azure.write.enforceLease is off.

lease = maybeCreateLease(relativePath, isNamespaceEnabled);
}
return new AbfsOutputStream(
client,
statistics,
Expand All @@ -551,15 +555,16 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
final FileSystem.Statistics statistics,
final String permission,
final String umask,
final boolean isAppendBlob) throws AzureBlobFileSystemException {
final boolean isAppendBlob,
AbfsLease lease) throws AzureBlobFileSystemException {
AbfsRestOperation op;

try {
// Trigger a create with overwrite=false first so that eTag fetch can be
// avoided for cases when no pre-existing file is present (major portion
// of create file traffic falls into the case of no pre-existing file).
op = client.createPath(relativePath, true,
false, permission, umask, isAppendBlob, null);
false, permission, umask, isAppendBlob, null, lease);
} catch (AbfsRestOperationException e) {
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
// File pre-exists, fetch eTag
Expand All @@ -583,7 +588,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
try {
// overwrite only if eTag matches with the file properties fetched befpre
op = client.createPath(relativePath, true,
true, permission, umask, isAppendBlob, eTag);
true, permission, umask, isAppendBlob, eTag, lease);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
// Is a parallel access case, as file with eTag was just queried
Expand Down Expand Up @@ -639,7 +644,7 @@ public void createDirectory(final Path path, final FsPermission permission, fina
final AbfsRestOperation op = client.createPath(getRelativePath(path),
false, overwrite,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null, false, null);
isNamespaceEnabled ? getOctalNotation(umask) : null, false, null, null);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
Expand Down Expand Up @@ -738,7 +743,7 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic
isAppendBlob = true;
}

AbfsLease lease = maybeCreateLease(relativePath);
AbfsLease lease = maybeCreateLease(relativePath, getIsNamespaceEnabled());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it only for hns enabled


return new AbfsOutputStream(
client,
Expand Down Expand Up @@ -1698,14 +1703,29 @@ private void updateInfiniteLeaseDirs() {
this.azureInfiniteLeaseDirSet.remove("");
}

private AbfsLease maybeCreateLease(String relativePath)
private AbfsLease maybeCreateFiniteLease(String relativePath, boolean isNamespaceEnabled)
throws AzureBlobFileSystemException {
boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath);
if (!enableInfiniteLease) {
return null;
AbfsLease lease = null;
if (!enableInfiniteLease && abfsConfiguration.isLeaseEnforced() && isNamespaceEnabled) {
lease = new AbfsLease(client, relativePath, false);
Copy link
Contributor

@snvijaya snvijaya Jun 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there scenarios where a finite lease created needs to explicitly call on acquireLease API ? If that dependency isnt there, it would be better to create 2 child classes to AbfsLease as AbfsInfiniteLeaseV1 and AbfsApiBundledLease.

}
AbfsLease lease = new AbfsLease(client, relativePath);
leaseRefs.put(lease, null);

return lease;
}

private AbfsLease maybeCreateLease(String relativePath, boolean isNamespaceEnabled)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge maybeCreateLease and maybeCreateInfiniteLease

throws AzureBlobFileSystemException {
boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath);
AbfsLease lease = null;
if (enableInfiniteLease) {
lease = new AbfsLease(client, relativePath, true);
leaseRefs.put(lease, null);
}
else if (abfsConfiguration.isLeaseEnforced() && isNamespaceEnabled) {
lease = new AbfsLease(client, relativePath, false);
}

return lease;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public final class AbfsHttpConstants {
public static final String BREAK_LEASE_ACTION = "break";
public static final String RELEASE_LEASE_ACTION = "release";
public static final String RENEW_LEASE_ACTION = "renew";
public static final String ACQUIRE_RELEASE_LEASE_ACTION = "acquire-release";
public static final String AUTO_RENEW_LEASE_ACTION = "auto-renew";
public static final String DEFAULT_LEASE_BREAK_PERIOD = "0";
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ public final class ConfigurationKeys {
/** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
* Default is empty. **/
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
/** Provides a config control to disable or enable lease enforcement during write operations - appends and creates.
* Default is false. **/
public static final String FS_AZURE_WRITE_ENFORCE_LEASE = "fs.azure.write.enforcelease";
/** Provides a config control to set lease duration in seconds if lease is to be enforced during write operations - appends and creates.
* It is applicable for Hierarchical Namespace enabled accounts only. The lease duration must be between 15 and 60 seconds. Default is 60. **/
public static final String FS_AZURE_WRITE_LEASE_DURATION = "fs.azure.write.lease.duration";
/** Provides a config to provide comma separated path prefixes which support infinite leases.
* Files under these paths will be leased when created or opened for writing and the lease will
* be released when the file is closed. The lease may be broken with the breakLease method on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE = true;
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
public static final String DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES = "";
public static final boolean DEFAULT_FS_AZURE_WRITE_ENFORCE_LEASE = false;
public static final int DEFAULT_FS_AZURE_WRITE_LEASE_DURATION = 60;
public static final int DEFAULT_LEASE_THREADS = 0;
public static final int MIN_LEASE_THREADS = 0;
public static final int DEFAULT_LEASE_DURATION = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,17 @@ public enum Mode {
private final int length;
private final Mode mode;
private final boolean isAppendBlob;
private final String leaseId;

public AppendRequestParameters(final long position,
final int offset,
final int length,
final Mode mode,
final boolean isAppendBlob,
final String leaseId) {
final boolean isAppendBlob) {
this.position = position;
this.offset = offset;
this.length = length;
this.mode = mode;
this.isAppendBlob = isAppendBlob;
this.leaseId = leaseId;
}

public long getPosition() {
Expand All @@ -68,8 +65,4 @@ public Mode getMode() {
public boolean isAppendBlob() {
return this.isAppendBlob;
}

public String getLeaseId() {
return this.leaseId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class AbfsClient implements Closeable {

private final URL baseUrl;
private final SharedKeyCredentials sharedKeyCredentials;
private final String xMsVersion = "2019-12-12";
private final String xMsVersion = "2020-08-04";
private final ExponentialRetryPolicy retryPolicy;
private final String filesystem;
private final AbfsConfiguration abfsConfiguration;
Expand Down Expand Up @@ -338,7 +338,8 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException

public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
final String permission, final String umask,
final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException {
final boolean isAppendBlob, final String eTag,
AbfsLease lease) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
if (isFile) {
addCustomerProvidedKeyHeaders(requestHeaders);
Expand All @@ -359,6 +360,12 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
}

if (lease != null && lease.getLeaseID() != null && !lease.getLeaseID().isEmpty() && isFile) {
lease.setLeaseAcquired(true);
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, lease.getLeaseID()));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration())));
}

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
if (isAppendBlob) {
Expand Down Expand Up @@ -561,17 +568,14 @@ public AbfsRestOperation renameIdempotencyCheckOp(
}

public AbfsRestOperation append(final String path, final byte[] buffer,
AppendRequestParameters reqParams, final String cachedSasToken)
AppendRequestParameters reqParams, final String cachedSasToken, AbfsLease lease)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lease should be a member of AppendRequestParameters.

throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
if (reqParams.getLeaseId() != null) {
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId()));
}

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
Expand All @@ -585,6 +589,26 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
}
}

if (lease != null && lease.getLeaseID() != null && !lease.getLeaseID().isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant code in flush and append. Create a function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add below member functions to AbfsLease :

  1. boolean hasValidLease() - which will return value of ( lease.getLeaseID() != null && !lease.getLeaseID().isEmpty())
  2. AddLeaseHeaders(List requestHeaders) - and move the requestHeader conditional header add logic into it

if (lease.isInfiniteLease()) {
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, lease.getLeaseID()));
} else {
if (!lease.isLeaseAcquired()) {
lease.setLeaseAcquired(true);
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION,
reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE ? ACQUIRE_RELEASE_LEASE_ACTION : ACQUIRE_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, lease.getLeaseID()));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration())));
} else if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) {
lease.setLeaseAcquired(false);
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID()));
} else {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AUTO_RENEW_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID()));
}
}
}
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
abfsUriQueryBuilder, cachedSasToken);
Expand Down Expand Up @@ -648,16 +672,33 @@ public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path,
}

public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData,
boolean isClose, final String cachedSasToken, final String leaseId)
boolean isClose, final String cachedSasToken, AbfsLease lease)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
if (leaseId != null) {
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));

if (lease != null && lease.getLeaseID() != null && !lease.getLeaseID().isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

if (lease.isInfiniteLease()) {
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, lease.getLeaseID()));
} else {
if (!lease.isLeaseAcquired()) {
lease.setLeaseAcquired(true);
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, isClose ? ACQUIRE_RELEASE_LEASE_ACTION : ACQUIRE_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, lease.getLeaseID()));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration())));
} else if (isClose) {
lease.setLeaseAcquired(false);
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID()));
} else {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AUTO_RENEW_LEASE_ACTION));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID()));
}
}
}

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
Expand Down
Loading